1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.ftp.core.data.handler;
21
22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufAllocator;
24 import io.netty.channel.ChannelHandlerContext;
25 import io.netty.handler.codec.ByteToMessageCodec;
26 import org.waarp.common.exception.InvalidArgumentException;
27 import org.waarp.common.file.DataBlock;
28 import org.waarp.common.future.WaarpFuture;
29 import org.waarp.common.logging.SysErrLogger;
30 import org.waarp.common.utility.WaarpNettyUtil;
31 import org.waarp.compress.zlib.ZlibCodec;
32 import org.waarp.ftp.core.command.FtpArgumentCode.TransferMode;
33 import org.waarp.ftp.core.command.FtpArgumentCode.TransferStructure;
34 import org.waarp.ftp.core.config.FtpInternalConfiguration;
35 import org.waarp.ftp.core.data.handler.FtpSeekAheadData.SeekAheadNoBackArrayException;
36
37 import java.io.IOException;
38 import java.util.List;
39 import java.util.concurrent.atomic.AtomicBoolean;
40
41
42
43
44
45
46
47 public class FtpDataModeCodec extends ByteToMessageCodec<DataBlock> {
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86 private TransferMode mode;
87
88
89
90
91 private TransferStructure structure;
92
93
94
95
96 private DataBlock dataBlock;
97
98
99
100
101 private int lastbyte;
102
103
104
105
106 private ZlibCodec zlibCodec = new ZlibCodec();
107
108
109
110
111 private final AtomicBoolean isReady = new AtomicBoolean(false);
112
113
114
115
116
117
118 private final WaarpFuture codecLocked = new WaarpFuture();
119
120
121
122
123
124 public FtpDataModeCodec(final TransferMode mode,
125 final TransferStructure structure) {
126 this.mode = mode;
127 this.structure = structure;
128 }
129
130
131
132
133
134 public final void setCodecReady() {
135 codecLocked.setSuccess();
136 }
137
138 protected final DataBlock decodeRecordStandard(final ByteBuf buf,
139 final int length) {
140 final ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length, length);
141 if (lastbyte == 0xFF) {
142 if (readByteForDataBlock(buf, newbuf)) {
143 lastbyte = 0;
144 }
145 }
146 try {
147 while (buf.readableBytes() > 0) {
148 lastbyte = buf.readUnsignedByte();
149 if (lastbyte == 0xFF) {
150 readByteForDataBlock(buf, newbuf);
151 } else {
152 newbuf.writeByte((byte) (lastbyte & 0xFF));
153 }
154 lastbyte = 0;
155 }
156 } catch (final IndexOutOfBoundsException e) {
157
158 }
159 dataBlock.setBlock(newbuf);
160 return dataBlock;
161 }
162
163 private boolean readByteForDataBlock(final ByteBuf buf,
164 final ByteBuf newbuf) {
165 final int nextbyte = buf.readUnsignedByte();
166 if (nextbyte == 0xFF) {
167 newbuf.writeByte((byte) (lastbyte & 0xFF));
168 return false;
169 } else {
170 if (nextbyte == 1) {
171 dataBlock.setEOR(true);
172 } else if (nextbyte == 2) {
173 dataBlock.setEOF(true);
174 } else if (nextbyte == 3) {
175 dataBlock.setEOR(true);
176 dataBlock.setEOF(true);
177 }
178 return true;
179 }
180 }
181
182 protected final DataBlock decodeRecord(final ByteBuf buf, final int length) {
183 final FtpSeekAheadData sad;
184 try {
185 sad = new FtpSeekAheadData(buf);
186 } catch (final SeekAheadNoBackArrayException e1) {
187 return decodeRecordStandard(buf, length);
188 }
189 final ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length, length);
190 if (lastbyte == 0xFF) {
191 if (readBytesFromSad(sad, newbuf)) {
192 lastbyte = 0;
193 }
194 }
195 try {
196 while (sad.pos < sad.limit) {
197 lastbyte = sad.bytes[sad.pos++] & 0xFF;
198 if (lastbyte == 0xFF) {
199 readBytesFromSad(sad, newbuf);
200 } else {
201 newbuf.writeByte((byte) (lastbyte & 0xFF));
202 }
203 lastbyte = 0;
204 }
205 } catch (final IndexOutOfBoundsException e) {
206
207 }
208 sad.setReadPosition(0);
209 dataBlock.setBlock(newbuf);
210 return dataBlock;
211 }
212
213 private boolean readBytesFromSad(final FtpSeekAheadData sad,
214 final ByteBuf newbuf) {
215 final int nextbyte = sad.bytes[sad.pos++] & 0xFF;
216 if (nextbyte == 0xFF) {
217 newbuf.writeByte((byte) (lastbyte & 0xFF));
218 return false;
219 } else {
220 if (nextbyte == 1) {
221 dataBlock.setEOR(true);
222 } else if (nextbyte == 2) {
223 dataBlock.setEOF(true);
224 } else if (nextbyte == 3) {
225 dataBlock.setEOR(true);
226 dataBlock.setEOF(true);
227 }
228 return true;
229 }
230 }
231
232 private void checkCodecUnlocked() throws InterruptedException {
233 if (!isReady.get()) {
234 for (int i = 0; i < FtpInternalConfiguration.RETRYNB; i++) {
235 if (!codecLocked.awaitOrInterruptible(
236 FtpInternalConfiguration.RETRYINMS)) {
237 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
238 } else {
239 break;
240 }
241 }
242 if (!codecLocked.awaitOrInterruptible(
243 FtpInternalConfiguration.RETRYINMS)) {
244 Thread.sleep(FtpInternalConfiguration.RETRYINMS);
245
246 setCodecReady();
247 }
248 isReady.set(true);
249 }
250 }
251
252 @Override
253 protected void decode(final ChannelHandlerContext ctx, final ByteBuf buf,
254 final List<Object> out) throws Exception {
255
256
257 checkCodecUnlocked();
258 if (buf.readableBytes() == 0) {
259 return;
260 }
261
262 if (mode == TransferMode.STREAM) {
263 dataBlock = new DataBlock();
264 if (structure != TransferStructure.RECORD) {
265 final ByteBuf newbuf = buf.slice();
266 buf.readerIndex(buf.readableBytes());
267 WaarpNettyUtil.retain(newbuf);
268 dataBlock.setBlock(newbuf);
269 out.add(dataBlock);
270 return;
271 }
272
273 final int length = buf.readableBytes();
274 out.add(decodeRecord(buf, length));
275 return;
276 } else if (mode == TransferMode.BLOCK) {
277
278
279 if (buf.readableBytes() < 3) {
280
281
282
283 return;
284 }
285
286
287
288
289
290
291
292 buf.markReaderIndex();
293
294 if (dataBlock == null) {
295 dataBlock = new DataBlock();
296 }
297
298 dataBlock.setDescriptor(buf.readByte());
299
300
301 final byte upper = buf.readByte();
302 final byte lower = buf.readByte();
303 dataBlock.setByteCount(upper, lower);
304
305
306 if (buf.readableBytes() < dataBlock.getByteCount()) {
307
308
309
310
311
312
313 buf.resetReaderIndex();
314
315 return;
316 }
317 if (dataBlock.getByteCount() > 0) {
318
319 dataBlock.setBlock(buf.readBytes(dataBlock.getByteCount()));
320 }
321 final DataBlock returnDataBlock = dataBlock;
322
323 dataBlock = null;
324
325 out.add(returnDataBlock);
326 return;
327 } else if (mode == TransferMode.ZLIB) {
328 dataBlock = new DataBlock();
329 if (structure != TransferStructure.RECORD) {
330 zlibCodec.writeForDecompression(buf);
331 dataBlock.setBlock(zlibCodec.readCodec());
332 out.add(dataBlock);
333 return;
334 }
335
336 throw new InvalidArgumentException(
337 "Mode unimplemented: " + mode.name() + " with " + structure.name());
338 }
339
340 throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
341 }
342
343 @Override
344 protected void decodeLast(final ChannelHandlerContext ctx, final ByteBuf in,
345 final List<Object> out) throws Exception {
346 if (mode == TransferMode.ZLIB) {
347 final byte[] bytes = zlibCodec.finishCodec();
348 if (bytes != null && bytes.length != 0) {
349 dataBlock = new DataBlock();
350 dataBlock.setBlock(bytes);
351 dataBlock.setEOF(true);
352 out.add(dataBlock);
353 }
354 }
355 }
356
357 protected final ByteBuf encodeRecord(final DataBlock msg,
358 final byte[] buffer) {
359 final int size = msg.getByteCount();
360 final ByteBuf newbuf = ByteBufAllocator.DEFAULT.ioBuffer(size);
361 int newbyte;
362 try {
363 int pos = 0;
364 final int limit = buffer.length;
365 while (pos < limit) {
366 newbyte = buffer[pos++] & 0xFF;
367 if (newbyte == 0xFF) {
368 newbuf.writeByte((byte) 0xFF);
369 }
370 newbuf.writeByte((byte) (newbyte & 0xFF));
371 }
372 } catch (final IndexOutOfBoundsException e) {
373
374 }
375 int value = 0;
376 if (msg.isEOF()) {
377 value += 2;
378 }
379 if (msg.isEOR()) {
380 value += 1;
381 }
382 if (value > 0) {
383 newbuf.writeByte((byte) 0xFF);
384 newbuf.writeByte((byte) (value & 0xFF));
385 }
386 msg.clear();
387 return newbuf;
388 }
389
390
391
392
393
394
395
396
397
398
399 protected final ByteBuf encode(final DataBlock msg)
400 throws InvalidArgumentException {
401 if (msg.isCleared()) {
402 return null;
403 }
404 final byte[] bytes = msg.getByteBlock();
405 if (mode == TransferMode.STREAM) {
406
407 if (structure == TransferStructure.RECORD) {
408 return encodeRecord(msg, bytes);
409 }
410 msg.clear();
411 return WaarpNettyUtil.wrappedBuffer(bytes);
412 } else if (mode == TransferMode.BLOCK) {
413 int length = msg.getByteCount();
414 final int size = length > 0xFFFF? 0xFFFF + 3 : length + 3;
415 final ByteBuf newbuf = ByteBufAllocator.DEFAULT.ioBuffer(size, size);
416 final byte[] header = { 0, 0, 0 };
417
418 if (length == 0) {
419
420 if (msg.isEOF() || msg.isEOR()) {
421 header[0] = msg.getDescriptor();
422 header[1] = 0;
423 header[2] = 0;
424 newbuf.writeBytes(header);
425
426 msg.clear();
427
428 return newbuf;
429 }
430
431 msg.clear();
432
433 return null;
434 }
435
436 if (msg.isRESTART()) {
437 header[0] = msg.getDescriptor();
438 header[1] = msg.getByteCountUpper();
439 header[2] = msg.getByteCountLower();
440 newbuf.writeBytes(header);
441 newbuf.writeBytes(msg.getByteMarkers());
442
443 msg.clear();
444
445 return newbuf;
446 }
447
448
449 if (length > 0xFFFF) {
450 header[0] = 0;
451 header[1] = (byte) 0xFF;
452 header[2] = (byte) 0xFF;
453 newbuf.writeBytes(header);
454
455 newbuf.writeBytes(bytes, msg.getOffset(), 0xFFFF);
456 msg.addOffset(0xFFFF);
457 length -= 0xFFFF;
458 msg.setByteCount(length);
459
460 return newbuf;
461 }
462
463 header[0] = msg.getDescriptor();
464 header[1] = msg.getByteCountUpper();
465 header[2] = msg.getByteCountLower();
466 newbuf.writeBytes(header);
467
468 newbuf.writeBytes(bytes, msg.getOffset(), length);
469
470 msg.clear();
471
472 return newbuf;
473 } else if (mode == TransferMode.ZLIB) {
474
475 if (structure == TransferStructure.RECORD) {
476 throw new InvalidArgumentException(
477 "Mode unimplemented: " + mode.name() + " with " + structure.name());
478 }
479 final boolean last = msg.isEOF();
480 msg.clear();
481 try {
482 zlibCodec.writeForCompression(bytes);
483 } catch (final IOException e) {
484 throw new InvalidArgumentException(e.getMessage());
485 }
486 if (last) {
487 try {
488 return WaarpNettyUtil.wrappedBuffer(zlibCodec.finishCodec());
489 } catch (final IOException e) {
490 throw new InvalidArgumentException(e.getMessage());
491 }
492 }
493 return WaarpNettyUtil.wrappedBuffer(zlibCodec.readCodec());
494 } else {
495
496 throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
497 }
498 }
499
500
501
502
503 public final TransferMode getMode() {
504 return mode;
505 }
506
507
508
509
510 public final void setMode(final TransferMode mode) {
511 this.mode = mode;
512 try {
513 zlibCodec.finishCodec();
514 } catch (final IOException e) {
515 SysErrLogger.FAKE_LOGGER.ignoreLog(e);
516 }
517 }
518
519
520
521
522 public final TransferStructure getStructure() {
523 return structure;
524 }
525
526
527
528
529 public final void setStructure(final TransferStructure structure) {
530 this.structure = structure;
531 }
532
533 @Override
534 protected void encode(final ChannelHandlerContext ctx, final DataBlock msg,
535 final ByteBuf out) throws Exception {
536
537
538 checkCodecUnlocked();
539 ByteBuf next = encode(msg);
540
541 while (next != null) {
542 out.writeBytes(next);
543 WaarpNettyUtil.release(next);
544 next = encode(msg);
545 }
546 }
547 }