View Javadoc

1   /**
2    * This file is part of Waarp Project.
3    * 
4    * Copyright 2009, Frederic Bregier, and individual contributors by the @author tags. See the
5    * COPYRIGHT.txt in the distribution for a full listing of individual contributors.
6    * 
7    * All Waarp Project is free software: you can redistribute it and/or modify it under the terms of
8    * the GNU General Public License as published by the Free Software Foundation, either version 3 of
9    * the License, or (at your option) any later version.
10   * 
11   * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even
12   * the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13   * Public License for more details.
14   * 
15   * You should have received a copy of the GNU General Public License along with Waarp . If not, see
16   * <http://www.gnu.org/licenses/>.
17   */
18  package org.waarp.ftp.core.data.handler;
19  
20  import java.util.List;
21  
22  import io.netty.buffer.ByteBuf;
23  import io.netty.buffer.ByteBufAllocator;
24  import io.netty.buffer.Unpooled;
25  import io.netty.channel.ChannelHandlerContext;
26  import io.netty.handler.codec.ByteToMessageCodec;
27  
28  import org.waarp.common.exception.InvalidArgumentException;
29  import org.waarp.common.file.DataBlock;
30  import org.waarp.common.future.WaarpFuture;
31  import org.waarp.ftp.core.command.FtpArgumentCode.TransferMode;
32  import org.waarp.ftp.core.command.FtpArgumentCode.TransferStructure;
33  import org.waarp.ftp.core.config.FtpConfiguration;
34  import org.waarp.ftp.core.data.handler.FtpSeekAheadData.SeekAheadNoBackArrayException;
35  
36  /**
37   * First CODEC :<br>
38   * - encode : takes a {@link DataBlock} and transforms it to a ByteBuf<br>
39   * - decode : takes a ByteBuf and transforms it to a {@link DataBlock}<br>
40   * STREAM and BLOCK mode are implemented. COMPRESSED mode is not implemented.
41   * 
42   * @author Frederic Bregier
43   * 
44   */
45  public class FtpDataModeCodec extends ByteToMessageCodec<DataBlock> {
46      /*
47       * 3.4.1. STREAM MODE The data is transmitted as a stream of bytes. There is no restriction on
48       * the representation type used; record structures are allowed. In a record structured file EOR
49       * and EOF will each be indicated by a two-byte control code. The first byte of the control code
50       * will be all ones, the escape character. The second byte will have the low order bit on and
51       * zeros elsewhere for EOR and the second low order bit on for EOF; that is, the byte will have
52       * value 1 for EOR and value 2 for EOF. EOR and EOF may be indicated together on the last byte
53       * transmitted by turning both low order bits on (i.e., the value 3). If a byte of all ones was
54       * intended to be sent as data, it should be repeated in the second byte of the control code. If
55       * the structure is a file structure, the EOF is indicated by the sending host closing the data
56       * connection and all bytes are data bytes. 3.4.2. BLOCK MODE The file is transmitted as a
57       * series of data blocks preceded by one or more header bytes. The header bytes contain a count
58       * field, and descriptor code. The count field indicates the total length of the data block in
59       * bytes, thus marking the beginning of the next data block (there are no filler bits). The
60       * descriptor code defines: last block in the file (EOF) last block in the record (EOR), restart
61       * marker (see the Section on Error Recovery and Restart) or suspect data (i.e., the data being
62       * transferred is suspected of errors and is not reliable). This last code is NOT intended for
63       * error control within FTP. It is motivated by the desire of sites exchanging certain types of
64       * data (e.g., seismic or weather data) to send and receive all the data despite local errors
65       * (such as "magnetic tape read errors"), but to indicate in the transmission that certain
66       * portions are suspect). Record structures are allowed in this mode, and any representation
67       * type may be used. The header consists of the three bytes. Of the 24 bits of header
68       * information, the 16 low order bits shall represent byte count, and the 8 high order bits
69       * shall represent descriptor codes as shown below. Block Header
70       * +----------------+----------------+----------------+ | Descriptor | Byte Count | | 8 bits |
71       * 16 bits | +----------------+----------------+----------------+ The descriptor codes are
72       * indicated by bit flags in the descriptor byte. Four codes have been assigned, where each code
73       * number is the decimal value of the corresponding bit in the byte. Code Meaning 128 End of
74       * data block is EOR 64 End of data block is EOF 32 Suspected errors in data block 16 Data block
75       * is a restart marker With this encoding, more than one descriptor coded condition may exist
76       * for a particular block. As many bits as necessary may be flagged. The restart marker is
77       * embedded in the data stream as an integral number of 8-bit bytes representing printable
78       * characters in the language being used over the control connection (e.g., default--NVT-ASCII).
79       * <SP> (Space, in the appropriate language) must not be used WITHIN a restart marker. For
80       * example, to transmit a six-character marker, the following would be sent:
81       * +--------+--------+--------+ |Descrptr| Byte count | |code= 16| = 6 |
82       * +--------+--------+--------+ +--------+--------+--------+ | Marker | Marker | Marker | | 8
83       * bits | 8 bits | 8 bits | +--------+--------+--------+ +--------+--------+--------+ | Marker |
84       * Marker | Marker | | 8 bits | 8 bits | 8 bits | +--------+--------+--------+
85       */
86      /**
87       * Transfer Mode
88       */
89      private TransferMode mode = null;
90  
91      /**
92       * Structure Mode
93       */
94      private TransferStructure structure = null;
95  
96      /**
97       * Ftp Data Block
98       */
99      private DataBlock dataBlock = null;
100 
101     /**
102      * Last byte for STREAM+RECORD
103      */
104     private int lastbyte = 0;
105 
106     /**
107      * Is the underlying DataNetworkHandler ready to receive block
108      */
109     private volatile boolean isReady = false;
110 
111     /**
112      * Blocking step between DataNetworkHandler and this Codec in order to wait that the
113      * DataNetworkHandler is ready
114      */
115     private final WaarpFuture codecLocked = new WaarpFuture();
116 
117     /**
118      * @param mode
119      * @param structure
120      */
121     public FtpDataModeCodec(TransferMode mode, TransferStructure structure) {
122         super();
123         this.mode = mode;
124         this.structure = structure;
125     }
126 
127     /**
128      * Inform the Codec that DataNetworkHandler is ready (called from DataNetworkHandler after
129      * setCorrectCodec).
130      * 
131      */
132     public void setCodecReady() {
133         codecLocked.setSuccess();
134     }
135 
136     protected DataBlock decodeRecordStandard(ByteBuf buf, int length) {
137         ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length);
138         if (lastbyte == 0xFF) {
139             int nextbyte = buf.readByte();
140             if (nextbyte == 0xFF) {
141                 newbuf.writeByte((byte) (lastbyte & 0xFF));
142             } else {
143                 if (nextbyte == 1) {
144                     dataBlock.setEOR(true);
145                 } else if (nextbyte == 2) {
146                     dataBlock.setEOF(true);
147                 } else if (nextbyte == 3) {
148                     dataBlock.setEOR(true);
149                     dataBlock.setEOF(true);
150                 }
151                 lastbyte = 0;
152             }
153         }
154         try {
155             while (true) {
156                 lastbyte = buf.readByte();
157                 if (lastbyte == 0xFF) {
158                     int nextbyte = buf.readByte();
159                     if (nextbyte == 0xFF) {
160                         newbuf.writeByte((byte) (lastbyte & 0xFF));
161                     } else {
162                         if (nextbyte == 1) {
163                             dataBlock.setEOR(true);
164                         } else if (nextbyte == 2) {
165                             dataBlock.setEOF(true);
166                         } else if (nextbyte == 3) {
167                             dataBlock.setEOR(true);
168                             dataBlock.setEOF(true);
169                         }
170                     }
171                 } else {
172                     newbuf.writeByte((byte) (lastbyte & 0xFF));
173                 }
174                 lastbyte = 0;
175             }
176         } catch (IndexOutOfBoundsException e) {
177             // End of read
178         }
179         dataBlock.setBlock(newbuf);
180         return dataBlock;
181     }
182 
183     protected DataBlock decodeRecord(ByteBuf buf, int length) {
184         FtpSeekAheadData sad = null;
185         try {
186             sad = new FtpSeekAheadData(buf);
187         } catch (SeekAheadNoBackArrayException e1) {
188             return decodeRecordStandard(buf, length);
189         }
190         ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length);
191         if (lastbyte == 0xFF) {
192             int nextbyte = sad.bytes[sad.pos++];
193             if (nextbyte == 0xFF) {
194                 newbuf.writeByte((byte) (lastbyte & 0xFF));
195             } else {
196                 if (nextbyte == 1) {
197                     dataBlock.setEOR(true);
198                 } else if (nextbyte == 2) {
199                     dataBlock.setEOF(true);
200                 } else if (nextbyte == 3) {
201                     dataBlock.setEOR(true);
202                     dataBlock.setEOF(true);
203                 }
204                 lastbyte = 0;
205             }
206         }
207         try {
208             while (sad.pos < sad.limit) {
209                 lastbyte = sad.bytes[sad.pos++];
210                 if (lastbyte == 0xFF) {
211                     int nextbyte = sad.bytes[sad.pos++];
212                     if (nextbyte == 0xFF) {
213                         newbuf.writeByte((byte) (lastbyte & 0xFF));
214                     } else {
215                         if (nextbyte == 1) {
216                             dataBlock.setEOR(true);
217                         } else if (nextbyte == 2) {
218                             dataBlock.setEOF(true);
219                         } else if (nextbyte == 3) {
220                             dataBlock.setEOR(true);
221                             dataBlock.setEOF(true);
222                         }
223                     }
224                 } else {
225                     newbuf.writeByte((byte) (lastbyte & 0xFF));
226                 }
227                 lastbyte = 0;
228             }
229         } catch (IndexOutOfBoundsException e) {
230             // End of read
231         }
232         sad.setReadPosition(0);
233         dataBlock.setBlock(newbuf);
234         return dataBlock;
235     }
236 
237     @Override
238     protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
239         // First test if the connection is fully ready (block might be
240         // transfered
241         // by client before connection is ready)
242         if (!isReady) {
243             if (!codecLocked.await(FtpConfiguration.getDATATIMEOUTCON())) {
244                 throw new InvalidArgumentException("Codec not unlocked while should be");
245             }
246             isReady = true;
247         }
248         if (buf.readableBytes() == 0) {
249             return;
250         }
251         // If STREAM Mode, no task to do, just next filter
252         if (mode == TransferMode.STREAM) {
253             dataBlock = new DataBlock();
254             if (structure != TransferStructure.RECORD) {
255                 ByteBuf newbuf = Unpooled.wrappedBuffer(buf);
256                 buf.readerIndex(buf.readableBytes());
257                 newbuf.retain();
258                 dataBlock.setBlock(newbuf);
259                 out.add(dataBlock);
260                 return;
261             }
262             // Except if RECORD Structure!
263             int length = buf.readableBytes();
264             out.add(decodeRecord(buf, length));
265             return;
266         } else if (mode == TransferMode.BLOCK) {
267             // Now we are in BLOCK Mode
268             // Make sure if the length field was received.
269             if (buf.readableBytes() < 3) {
270                 // The length field was not received yet - return null.
271                 // This method will be invoked again when more packets are
272                 // received and appended to the buffer.
273                 return;
274             }
275 
276             // The length field is in the buffer.
277 
278             // Mark the current buffer position before reading the length field
279             // because the whole frame might not be in the buffer yet.
280             // We will reset the buffer position to the marked position if
281             // there's not enough bytes in the buffer.
282             buf.markReaderIndex();
283 
284             if (dataBlock == null) {
285                 dataBlock = new DataBlock();
286             }
287             // Read the descriptor
288             dataBlock.setDescriptor(buf.readByte());
289 
290             // Read the length field.
291             byte upper = buf.readByte();
292             byte lower = buf.readByte();
293             dataBlock.setByteCount(upper, lower);
294 
295             // Make sure if there's enough bytes in the buffer.
296             if (buf.readableBytes() < dataBlock.getByteCount()) {
297                 // The whole bytes were not received yet - return null.
298                 // This method will be invoked again when more packets are
299                 // received and appended to the buffer.
300 
301                 // Reset to the marked position to read the length field again
302                 // next time.
303                 buf.resetReaderIndex();
304 
305                 return;
306             }
307             if (dataBlock.getByteCount() > 0) {
308                 // There's enough bytes in the buffer. Read it.
309                 dataBlock.setBlock(buf.readBytes(dataBlock.getByteCount()));
310             }
311             DataBlock returnDataBlock = dataBlock;
312             // Free the datablock for next frame
313             dataBlock = null;
314             // Successfully decoded a frame. Return the decoded frame.
315             out.add(returnDataBlock);
316             return;
317         }
318         // Type unimplemented
319         throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
320     }
321 
322     protected ByteBuf encodeRecordStandard(DataBlock msg, ByteBuf buffer) {
323         ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(msg.getByteCount());
324         int newbyte = 0;
325         try {
326             while (true) {
327                 newbyte = buffer.readByte();
328                 if (newbyte == 0xFF) {
329                     newbuf.writeByte((byte) (newbyte & 0xFF));
330                 }
331                 newbuf.writeByte((byte) (newbyte & 0xFF));
332             }
333         } catch (IndexOutOfBoundsException e) {
334             // end of read
335         }
336         int value = 0;
337         if (msg.isEOF()) {
338             value += 2;
339         }
340         if (msg.isEOR()) {
341             value += 1;
342         }
343         if (value > 0) {
344             newbuf.writeByte((byte) 0xFF);
345             newbuf.writeByte((byte) (value & 0xFF));
346         }
347         msg.clear();
348         return newbuf;
349     }
350 
351     protected ByteBuf encodeRecord(DataBlock msg, ByteBuf buffer) {
352         FtpSeekAheadData sad = null;
353         try {
354             sad = new FtpSeekAheadData(buffer);
355         } catch (SeekAheadNoBackArrayException e1) {
356             return encodeRecordStandard(msg, buffer);
357         }
358         ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(msg.getByteCount());
359         int newbyte = 0;
360         try {
361             while (sad.pos < sad.limit) {
362                 newbyte = sad.bytes[sad.pos++];
363                 if (newbyte == 0xFF) {
364                     newbuf.writeByte((byte) (newbyte & 0xFF));
365                 }
366                 newbuf.writeByte((byte) (newbyte & 0xFF));
367             }
368         } catch (IndexOutOfBoundsException e) {
369             // end of read
370         }
371         int value = 0;
372         if (msg.isEOF()) {
373             value += 2;
374         }
375         if (msg.isEOR()) {
376             value += 1;
377         }
378         if (value > 0) {
379             newbuf.writeByte((byte) 0xFF);
380             newbuf.writeByte((byte) (value & 0xFF));
381         }
382         msg.clear();
383         sad.setReadPosition(0);
384         return newbuf;
385     }
386 
387     /**
388      * Encode a DataBlock in the correct format for Mode
389      * 
390      * @param msg
391      * @return the ByteBuf or null when the last block is already done
392      * @throws InvalidArgumentException
393      */
394     protected ByteBuf encode(DataBlock msg)
395             throws InvalidArgumentException {
396         if (msg.isCleared()) {
397             return null;
398         }
399         ByteBuf buffer = msg.getBlock();
400         if (mode == TransferMode.STREAM) {
401             // If record structure, special attention
402             if (structure == TransferStructure.RECORD) {
403                 return encodeRecord(msg, buffer);
404             }
405             msg.clear();
406             return buffer;
407         } else if (mode == TransferMode.BLOCK) {
408             int length = msg.getByteCount();
409             ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length > 0xFFFF ? 0xFFFF + 3 : length + 3);
410             byte[] header = new byte[3];
411             // Is there any data left
412             if (length == 0) {
413                 // It could be an empty block for EOR or EOF
414                 if (msg.isEOF() || msg.isEOR()) {
415                     header[0] = msg.getDescriptor();
416                     header[1] = 0;
417                     header[2] = 0;
418                     newbuf.writeBytes(header);
419                     // Next call will be the last one
420                     msg.clear();
421                     // return the last block
422                     return newbuf;
423                 }
424                 // This was the very last call
425                 msg.clear();
426                 // return the end of encode
427                 return null;
428             }
429             // Is this a Restart so only Markers
430             if (msg.isRESTART()) {
431                 header[0] = msg.getDescriptor();
432                 header[1] = msg.getByteCountUpper();
433                 header[2] = msg.getByteCountLower();
434                 newbuf.writeBytes(header);
435                 newbuf.writeBytes(msg.getByteMarkers());
436                 // Next call will be the last one
437                 msg.clear();
438                 // return the last block
439                 return newbuf;
440             }
441             // Work on sub block, ignoring descriptor since it is not the last
442             // one
443             if (length > 0xFFFF) {
444                 header[0] = 0;
445                 header[1] = (byte) 0xFF;
446                 header[2] = (byte) 0xFF;
447                 newbuf.writeBytes(header);
448                 // Now take the first 0xFFFF bytes from buffer
449                 newbuf.writeBytes(msg.getBlock(), 0xFFFF);
450                 length -= 0xFFFF;
451                 msg.setByteCount(length);
452                 // return the sub block
453                 return newbuf;
454             }
455             // Last final block, using the descriptor
456             header[0] = msg.getDescriptor();
457             header[1] = msg.getByteCountUpper();
458             header[2] = msg.getByteCountLower();
459             newbuf.writeBytes(header);
460             // real data
461             newbuf.writeBytes(buffer, length);
462             // Next call will be the last one
463             msg.clear();
464             // return the last block
465             return newbuf;
466         }
467         // Mode unimplemented
468         throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
469     }
470 
471     /**
472      * @return the mode
473      */
474     public TransferMode getMode() {
475         return mode;
476     }
477 
478     /**
479      * @param mode
480      *            the mode to set
481      */
482     public void setMode(TransferMode mode) {
483         this.mode = mode;
484     }
485 
486     /**
487      * @return the structure
488      */
489     public TransferStructure getStructure() {
490         return structure;
491     }
492 
493     /**
494      * @param structure
495      *            the structure to set
496      */
497     public void setStructure(TransferStructure structure) {
498         this.structure = structure;
499     }
500 
501     @Override
502     protected void encode(ChannelHandlerContext ctx, DataBlock msg, ByteBuf out) throws Exception {
503         // First test if the connection is fully ready (block might be
504         // transfered
505         // by client before connection is ready)
506         if (!isReady) {
507             if (!codecLocked.await(FtpConfiguration.getDATATIMEOUTCON())) {
508                 throw new InvalidArgumentException("Codec not unlocked while should be");
509             }
510             isReady = true;
511         }
512         ByteBuf next = encode(msg);
513         // Could be splitten in several block
514         while (next != null) {
515             out.writeBytes(next);
516             next = encode(msg);
517         }
518     }
519 }