1 /**
2 * This file is part of Waarp Project.
3 *
4 * Copyright 2009, Frederic Bregier, and individual contributors by the @author tags. See the
5 * COPYRIGHT.txt in the distribution for a full listing of individual contributors.
6 *
7 * All Waarp Project is free software: you can redistribute it and/or modify it under the terms of
8 * the GNU General Public License as published by the Free Software Foundation, either version 3 of
9 * the License, or (at your option) any later version.
10 *
11 * Waarp is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even
12 * the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General
13 * Public License for more details.
14 *
15 * You should have received a copy of the GNU General Public License along with Waarp . If not, see
16 * <http://www.gnu.org/licenses/>.
17 */
18 package org.waarp.ftp.core.data.handler;
19
20 import java.util.List;
21
22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufAllocator;
24 import io.netty.buffer.Unpooled;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.handler.codec.ByteToMessageCodec;
27
28 import org.waarp.common.exception.InvalidArgumentException;
29 import org.waarp.common.file.DataBlock;
30 import org.waarp.common.future.WaarpFuture;
31 import org.waarp.ftp.core.command.FtpArgumentCode.TransferMode;
32 import org.waarp.ftp.core.command.FtpArgumentCode.TransferStructure;
33 import org.waarp.ftp.core.config.FtpConfiguration;
34 import org.waarp.ftp.core.data.handler.FtpSeekAheadData.SeekAheadNoBackArrayException;
35
36 /**
37 * First CODEC :<br>
38 * - encode : takes a {@link DataBlock} and transforms it to a ByteBuf<br>
39 * - decode : takes a ByteBuf and transforms it to a {@link DataBlock}<br>
40 * STREAM and BLOCK mode are implemented. COMPRESSED mode is not implemented.
41 *
42 * @author Frederic Bregier
43 *
44 */
45 public class FtpDataModeCodec extends ByteToMessageCodec<DataBlock> {
46 /*
47 * 3.4.1. STREAM MODE The data is transmitted as a stream of bytes. There is no restriction on
48 * the representation type used; record structures are allowed. In a record structured file EOR
49 * and EOF will each be indicated by a two-byte control code. The first byte of the control code
50 * will be all ones, the escape character. The second byte will have the low order bit on and
51 * zeros elsewhere for EOR and the second low order bit on for EOF; that is, the byte will have
52 * value 1 for EOR and value 2 for EOF. EOR and EOF may be indicated together on the last byte
53 * transmitted by turning both low order bits on (i.e., the value 3). If a byte of all ones was
54 * intended to be sent as data, it should be repeated in the second byte of the control code. If
55 * the structure is a file structure, the EOF is indicated by the sending host closing the data
56 * connection and all bytes are data bytes. 3.4.2. BLOCK MODE The file is transmitted as a
57 * series of data blocks preceded by one or more header bytes. The header bytes contain a count
58 * field, and descriptor code. The count field indicates the total length of the data block in
59 * bytes, thus marking the beginning of the next data block (there are no filler bits). The
60 * descriptor code defines: last block in the file (EOF) last block in the record (EOR), restart
61 * marker (see the Section on Error Recovery and Restart) or suspect data (i.e., the data being
62 * transferred is suspected of errors and is not reliable). This last code is NOT intended for
63 * error control within FTP. It is motivated by the desire of sites exchanging certain types of
64 * data (e.g., seismic or weather data) to send and receive all the data despite local errors
65 * (such as "magnetic tape read errors"), but to indicate in the transmission that certain
66 * portions are suspect). Record structures are allowed in this mode, and any representation
67 * type may be used. The header consists of the three bytes. Of the 24 bits of header
68 * information, the 16 low order bits shall represent byte count, and the 8 high order bits
69 * shall represent descriptor codes as shown below. Block Header
70 * +----------------+----------------+----------------+ | Descriptor | Byte Count | | 8 bits |
71 * 16 bits | +----------------+----------------+----------------+ The descriptor codes are
72 * indicated by bit flags in the descriptor byte. Four codes have been assigned, where each code
73 * number is the decimal value of the corresponding bit in the byte. Code Meaning 128 End of
74 * data block is EOR 64 End of data block is EOF 32 Suspected errors in data block 16 Data block
75 * is a restart marker With this encoding, more than one descriptor coded condition may exist
76 * for a particular block. As many bits as necessary may be flagged. The restart marker is
77 * embedded in the data stream as an integral number of 8-bit bytes representing printable
78 * characters in the language being used over the control connection (e.g., default--NVT-ASCII).
79 * <SP> (Space, in the appropriate language) must not be used WITHIN a restart marker. For
80 * example, to transmit a six-character marker, the following would be sent:
81 * +--------+--------+--------+ |Descrptr| Byte count | |code= 16| = 6 |
82 * +--------+--------+--------+ +--------+--------+--------+ | Marker | Marker | Marker | | 8
83 * bits | 8 bits | 8 bits | +--------+--------+--------+ +--------+--------+--------+ | Marker |
84 * Marker | Marker | | 8 bits | 8 bits | 8 bits | +--------+--------+--------+
85 */
86 /**
87 * Transfer Mode
88 */
89 private TransferMode mode = null;
90
91 /**
92 * Structure Mode
93 */
94 private TransferStructure structure = null;
95
96 /**
97 * Ftp Data Block
98 */
99 private DataBlock dataBlock = null;
100
101 /**
102 * Last byte for STREAM+RECORD
103 */
104 private int lastbyte = 0;
105
106 /**
107 * Is the underlying DataNetworkHandler ready to receive block
108 */
109 private volatile boolean isReady = false;
110
111 /**
112 * Blocking step between DataNetworkHandler and this Codec in order to wait that the
113 * DataNetworkHandler is ready
114 */
115 private final WaarpFuture codecLocked = new WaarpFuture();
116
117 /**
118 * @param mode
119 * @param structure
120 */
121 public FtpDataModeCodec(TransferMode mode, TransferStructure structure) {
122 super();
123 this.mode = mode;
124 this.structure = structure;
125 }
126
127 /**
128 * Inform the Codec that DataNetworkHandler is ready (called from DataNetworkHandler after
129 * setCorrectCodec).
130 *
131 */
132 public void setCodecReady() {
133 codecLocked.setSuccess();
134 }
135
136 protected DataBlock decodeRecordStandard(ByteBuf buf, int length) {
137 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length);
138 if (lastbyte == 0xFF) {
139 int nextbyte = buf.readByte();
140 if (nextbyte == 0xFF) {
141 newbuf.writeByte((byte) (lastbyte & 0xFF));
142 } else {
143 if (nextbyte == 1) {
144 dataBlock.setEOR(true);
145 } else if (nextbyte == 2) {
146 dataBlock.setEOF(true);
147 } else if (nextbyte == 3) {
148 dataBlock.setEOR(true);
149 dataBlock.setEOF(true);
150 }
151 lastbyte = 0;
152 }
153 }
154 try {
155 while (true) {
156 lastbyte = buf.readByte();
157 if (lastbyte == 0xFF) {
158 int nextbyte = buf.readByte();
159 if (nextbyte == 0xFF) {
160 newbuf.writeByte((byte) (lastbyte & 0xFF));
161 } else {
162 if (nextbyte == 1) {
163 dataBlock.setEOR(true);
164 } else if (nextbyte == 2) {
165 dataBlock.setEOF(true);
166 } else if (nextbyte == 3) {
167 dataBlock.setEOR(true);
168 dataBlock.setEOF(true);
169 }
170 }
171 } else {
172 newbuf.writeByte((byte) (lastbyte & 0xFF));
173 }
174 lastbyte = 0;
175 }
176 } catch (IndexOutOfBoundsException e) {
177 // End of read
178 }
179 dataBlock.setBlock(newbuf);
180 return dataBlock;
181 }
182
183 protected DataBlock decodeRecord(ByteBuf buf, int length) {
184 FtpSeekAheadData sad = null;
185 try {
186 sad = new FtpSeekAheadData(buf);
187 } catch (SeekAheadNoBackArrayException e1) {
188 return decodeRecordStandard(buf, length);
189 }
190 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length);
191 if (lastbyte == 0xFF) {
192 int nextbyte = sad.bytes[sad.pos++];
193 if (nextbyte == 0xFF) {
194 newbuf.writeByte((byte) (lastbyte & 0xFF));
195 } else {
196 if (nextbyte == 1) {
197 dataBlock.setEOR(true);
198 } else if (nextbyte == 2) {
199 dataBlock.setEOF(true);
200 } else if (nextbyte == 3) {
201 dataBlock.setEOR(true);
202 dataBlock.setEOF(true);
203 }
204 lastbyte = 0;
205 }
206 }
207 try {
208 while (sad.pos < sad.limit) {
209 lastbyte = sad.bytes[sad.pos++];
210 if (lastbyte == 0xFF) {
211 int nextbyte = sad.bytes[sad.pos++];
212 if (nextbyte == 0xFF) {
213 newbuf.writeByte((byte) (lastbyte & 0xFF));
214 } else {
215 if (nextbyte == 1) {
216 dataBlock.setEOR(true);
217 } else if (nextbyte == 2) {
218 dataBlock.setEOF(true);
219 } else if (nextbyte == 3) {
220 dataBlock.setEOR(true);
221 dataBlock.setEOF(true);
222 }
223 }
224 } else {
225 newbuf.writeByte((byte) (lastbyte & 0xFF));
226 }
227 lastbyte = 0;
228 }
229 } catch (IndexOutOfBoundsException e) {
230 // End of read
231 }
232 sad.setReadPosition(0);
233 dataBlock.setBlock(newbuf);
234 return dataBlock;
235 }
236
237 @Override
238 protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
239 // First test if the connection is fully ready (block might be
240 // transfered
241 // by client before connection is ready)
242 if (!isReady) {
243 if (!codecLocked.await(FtpConfiguration.getDATATIMEOUTCON())) {
244 throw new InvalidArgumentException("Codec not unlocked while should be");
245 }
246 isReady = true;
247 }
248 if (buf.readableBytes() == 0) {
249 return;
250 }
251 // If STREAM Mode, no task to do, just next filter
252 if (mode == TransferMode.STREAM) {
253 dataBlock = new DataBlock();
254 if (structure != TransferStructure.RECORD) {
255 ByteBuf newbuf = Unpooled.wrappedBuffer(buf);
256 buf.readerIndex(buf.readableBytes());
257 newbuf.retain();
258 dataBlock.setBlock(newbuf);
259 out.add(dataBlock);
260 return;
261 }
262 // Except if RECORD Structure!
263 int length = buf.readableBytes();
264 out.add(decodeRecord(buf, length));
265 return;
266 } else if (mode == TransferMode.BLOCK) {
267 // Now we are in BLOCK Mode
268 // Make sure if the length field was received.
269 if (buf.readableBytes() < 3) {
270 // The length field was not received yet - return null.
271 // This method will be invoked again when more packets are
272 // received and appended to the buffer.
273 return;
274 }
275
276 // The length field is in the buffer.
277
278 // Mark the current buffer position before reading the length field
279 // because the whole frame might not be in the buffer yet.
280 // We will reset the buffer position to the marked position if
281 // there's not enough bytes in the buffer.
282 buf.markReaderIndex();
283
284 if (dataBlock == null) {
285 dataBlock = new DataBlock();
286 }
287 // Read the descriptor
288 dataBlock.setDescriptor(buf.readByte());
289
290 // Read the length field.
291 byte upper = buf.readByte();
292 byte lower = buf.readByte();
293 dataBlock.setByteCount(upper, lower);
294
295 // Make sure if there's enough bytes in the buffer.
296 if (buf.readableBytes() < dataBlock.getByteCount()) {
297 // The whole bytes were not received yet - return null.
298 // This method will be invoked again when more packets are
299 // received and appended to the buffer.
300
301 // Reset to the marked position to read the length field again
302 // next time.
303 buf.resetReaderIndex();
304
305 return;
306 }
307 if (dataBlock.getByteCount() > 0) {
308 // There's enough bytes in the buffer. Read it.
309 dataBlock.setBlock(buf.readBytes(dataBlock.getByteCount()));
310 }
311 DataBlock returnDataBlock = dataBlock;
312 // Free the datablock for next frame
313 dataBlock = null;
314 // Successfully decoded a frame. Return the decoded frame.
315 out.add(returnDataBlock);
316 return;
317 }
318 // Type unimplemented
319 throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
320 }
321
322 protected ByteBuf encodeRecordStandard(DataBlock msg, ByteBuf buffer) {
323 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(msg.getByteCount());
324 int newbyte = 0;
325 try {
326 while (true) {
327 newbyte = buffer.readByte();
328 if (newbyte == 0xFF) {
329 newbuf.writeByte((byte) (newbyte & 0xFF));
330 }
331 newbuf.writeByte((byte) (newbyte & 0xFF));
332 }
333 } catch (IndexOutOfBoundsException e) {
334 // end of read
335 }
336 int value = 0;
337 if (msg.isEOF()) {
338 value += 2;
339 }
340 if (msg.isEOR()) {
341 value += 1;
342 }
343 if (value > 0) {
344 newbuf.writeByte((byte) 0xFF);
345 newbuf.writeByte((byte) (value & 0xFF));
346 }
347 msg.clear();
348 return newbuf;
349 }
350
351 protected ByteBuf encodeRecord(DataBlock msg, ByteBuf buffer) {
352 FtpSeekAheadData sad = null;
353 try {
354 sad = new FtpSeekAheadData(buffer);
355 } catch (SeekAheadNoBackArrayException e1) {
356 return encodeRecordStandard(msg, buffer);
357 }
358 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(msg.getByteCount());
359 int newbyte = 0;
360 try {
361 while (sad.pos < sad.limit) {
362 newbyte = sad.bytes[sad.pos++];
363 if (newbyte == 0xFF) {
364 newbuf.writeByte((byte) (newbyte & 0xFF));
365 }
366 newbuf.writeByte((byte) (newbyte & 0xFF));
367 }
368 } catch (IndexOutOfBoundsException e) {
369 // end of read
370 }
371 int value = 0;
372 if (msg.isEOF()) {
373 value += 2;
374 }
375 if (msg.isEOR()) {
376 value += 1;
377 }
378 if (value > 0) {
379 newbuf.writeByte((byte) 0xFF);
380 newbuf.writeByte((byte) (value & 0xFF));
381 }
382 msg.clear();
383 sad.setReadPosition(0);
384 return newbuf;
385 }
386
387 /**
388 * Encode a DataBlock in the correct format for Mode
389 *
390 * @param msg
391 * @return the ByteBuf or null when the last block is already done
392 * @throws InvalidArgumentException
393 */
394 protected ByteBuf encode(DataBlock msg)
395 throws InvalidArgumentException {
396 if (msg.isCleared()) {
397 return null;
398 }
399 ByteBuf buffer = msg.getBlock();
400 if (mode == TransferMode.STREAM) {
401 // If record structure, special attention
402 if (structure == TransferStructure.RECORD) {
403 return encodeRecord(msg, buffer);
404 }
405 msg.clear();
406 return buffer;
407 } else if (mode == TransferMode.BLOCK) {
408 int length = msg.getByteCount();
409 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length > 0xFFFF ? 0xFFFF + 3 : length + 3);
410 byte[] header = new byte[3];
411 // Is there any data left
412 if (length == 0) {
413 // It could be an empty block for EOR or EOF
414 if (msg.isEOF() || msg.isEOR()) {
415 header[0] = msg.getDescriptor();
416 header[1] = 0;
417 header[2] = 0;
418 newbuf.writeBytes(header);
419 // Next call will be the last one
420 msg.clear();
421 // return the last block
422 return newbuf;
423 }
424 // This was the very last call
425 msg.clear();
426 // return the end of encode
427 return null;
428 }
429 // Is this a Restart so only Markers
430 if (msg.isRESTART()) {
431 header[0] = msg.getDescriptor();
432 header[1] = msg.getByteCountUpper();
433 header[2] = msg.getByteCountLower();
434 newbuf.writeBytes(header);
435 newbuf.writeBytes(msg.getByteMarkers());
436 // Next call will be the last one
437 msg.clear();
438 // return the last block
439 return newbuf;
440 }
441 // Work on sub block, ignoring descriptor since it is not the last
442 // one
443 if (length > 0xFFFF) {
444 header[0] = 0;
445 header[1] = (byte) 0xFF;
446 header[2] = (byte) 0xFF;
447 newbuf.writeBytes(header);
448 // Now take the first 0xFFFF bytes from buffer
449 newbuf.writeBytes(msg.getBlock(), 0xFFFF);
450 length -= 0xFFFF;
451 msg.setByteCount(length);
452 // return the sub block
453 return newbuf;
454 }
455 // Last final block, using the descriptor
456 header[0] = msg.getDescriptor();
457 header[1] = msg.getByteCountUpper();
458 header[2] = msg.getByteCountLower();
459 newbuf.writeBytes(header);
460 // real data
461 newbuf.writeBytes(buffer, length);
462 // Next call will be the last one
463 msg.clear();
464 // return the last block
465 return newbuf;
466 }
467 // Mode unimplemented
468 throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
469 }
470
471 /**
472 * @return the mode
473 */
474 public TransferMode getMode() {
475 return mode;
476 }
477
478 /**
479 * @param mode
480 * the mode to set
481 */
482 public void setMode(TransferMode mode) {
483 this.mode = mode;
484 }
485
486 /**
487 * @return the structure
488 */
489 public TransferStructure getStructure() {
490 return structure;
491 }
492
493 /**
494 * @param structure
495 * the structure to set
496 */
497 public void setStructure(TransferStructure structure) {
498 this.structure = structure;
499 }
500
501 @Override
502 protected void encode(ChannelHandlerContext ctx, DataBlock msg, ByteBuf out) throws Exception {
503 // First test if the connection is fully ready (block might be
504 // transfered
505 // by client before connection is ready)
506 if (!isReady) {
507 if (!codecLocked.await(FtpConfiguration.getDATATIMEOUTCON())) {
508 throw new InvalidArgumentException("Codec not unlocked while should be");
509 }
510 isReady = true;
511 }
512 ByteBuf next = encode(msg);
513 // Could be splitten in several block
514 while (next != null) {
515 out.writeBytes(next);
516 next = encode(msg);
517 }
518 }
519 }