1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18 package org.waarp.ftp.core.data.handler;
19
20 import java.util.List;
21
22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufAllocator;
24 import io.netty.buffer.Unpooled;
25 import io.netty.channel.ChannelHandlerContext;
26 import io.netty.handler.codec.ByteToMessageCodec;
27
28 import org.waarp.common.exception.InvalidArgumentException;
29 import org.waarp.common.file.DataBlock;
30 import org.waarp.common.future.WaarpFuture;
31 import org.waarp.ftp.core.command.FtpArgumentCode.TransferMode;
32 import org.waarp.ftp.core.command.FtpArgumentCode.TransferStructure;
33 import org.waarp.ftp.core.config.FtpConfiguration;
34 import org.waarp.ftp.core.data.handler.FtpSeekAheadData.SeekAheadNoBackArrayException;
35
36
37
38
39
40
41
42
43
44
45 public class FtpDataModeCodec extends ByteToMessageCodec<DataBlock> {
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89 private TransferMode mode = null;
90
91
92
93
94 private TransferStructure structure = null;
95
96
97
98
99 private DataBlock dataBlock = null;
100
101
102
103
104 private int lastbyte = 0;
105
106
107
108
109 private volatile boolean isReady = false;
110
111
112
113
114
115 private final WaarpFuture codecLocked = new WaarpFuture();
116
117
118
119
120
121 public FtpDataModeCodec(TransferMode mode, TransferStructure structure) {
122 super();
123 this.mode = mode;
124 this.structure = structure;
125 }
126
127
128
129
130
131
132 public void setCodecReady() {
133 codecLocked.setSuccess();
134 }
135
136 protected DataBlock decodeRecordStandard(ByteBuf buf, int length) {
137 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length);
138 if (lastbyte == 0xFF) {
139 int nextbyte = buf.readByte();
140 if (nextbyte == 0xFF) {
141 newbuf.writeByte((byte) (lastbyte & 0xFF));
142 } else {
143 if (nextbyte == 1) {
144 dataBlock.setEOR(true);
145 } else if (nextbyte == 2) {
146 dataBlock.setEOF(true);
147 } else if (nextbyte == 3) {
148 dataBlock.setEOR(true);
149 dataBlock.setEOF(true);
150 }
151 lastbyte = 0;
152 }
153 }
154 try {
155 while (true) {
156 lastbyte = buf.readByte();
157 if (lastbyte == 0xFF) {
158 int nextbyte = buf.readByte();
159 if (nextbyte == 0xFF) {
160 newbuf.writeByte((byte) (lastbyte & 0xFF));
161 } else {
162 if (nextbyte == 1) {
163 dataBlock.setEOR(true);
164 } else if (nextbyte == 2) {
165 dataBlock.setEOF(true);
166 } else if (nextbyte == 3) {
167 dataBlock.setEOR(true);
168 dataBlock.setEOF(true);
169 }
170 }
171 } else {
172 newbuf.writeByte((byte) (lastbyte & 0xFF));
173 }
174 lastbyte = 0;
175 }
176 } catch (IndexOutOfBoundsException e) {
177
178 }
179 dataBlock.setBlock(newbuf);
180 return dataBlock;
181 }
182
183 protected DataBlock decodeRecord(ByteBuf buf, int length) {
184 FtpSeekAheadData sad = null;
185 try {
186 sad = new FtpSeekAheadData(buf);
187 } catch (SeekAheadNoBackArrayException e1) {
188 return decodeRecordStandard(buf, length);
189 }
190 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length);
191 if (lastbyte == 0xFF) {
192 int nextbyte = sad.bytes[sad.pos++];
193 if (nextbyte == 0xFF) {
194 newbuf.writeByte((byte) (lastbyte & 0xFF));
195 } else {
196 if (nextbyte == 1) {
197 dataBlock.setEOR(true);
198 } else if (nextbyte == 2) {
199 dataBlock.setEOF(true);
200 } else if (nextbyte == 3) {
201 dataBlock.setEOR(true);
202 dataBlock.setEOF(true);
203 }
204 lastbyte = 0;
205 }
206 }
207 try {
208 while (sad.pos < sad.limit) {
209 lastbyte = sad.bytes[sad.pos++];
210 if (lastbyte == 0xFF) {
211 int nextbyte = sad.bytes[sad.pos++];
212 if (nextbyte == 0xFF) {
213 newbuf.writeByte((byte) (lastbyte & 0xFF));
214 } else {
215 if (nextbyte == 1) {
216 dataBlock.setEOR(true);
217 } else if (nextbyte == 2) {
218 dataBlock.setEOF(true);
219 } else if (nextbyte == 3) {
220 dataBlock.setEOR(true);
221 dataBlock.setEOF(true);
222 }
223 }
224 } else {
225 newbuf.writeByte((byte) (lastbyte & 0xFF));
226 }
227 lastbyte = 0;
228 }
229 } catch (IndexOutOfBoundsException e) {
230
231 }
232 sad.setReadPosition(0);
233 dataBlock.setBlock(newbuf);
234 return dataBlock;
235 }
236
237 @Override
238 protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
239
240
241
242 if (!isReady) {
243 if (!codecLocked.await(FtpConfiguration.getDATATIMEOUTCON())) {
244 throw new InvalidArgumentException("Codec not unlocked while should be");
245 }
246 isReady = true;
247 }
248 if (buf.readableBytes() == 0) {
249 return;
250 }
251
252 if (mode == TransferMode.STREAM) {
253 dataBlock = new DataBlock();
254 if (structure != TransferStructure.RECORD) {
255 ByteBuf newbuf = Unpooled.wrappedBuffer(buf);
256 buf.readerIndex(buf.readableBytes());
257 newbuf.retain();
258 dataBlock.setBlock(newbuf);
259 out.add(dataBlock);
260 return;
261 }
262
263 int length = buf.readableBytes();
264 out.add(decodeRecord(buf, length));
265 return;
266 } else if (mode == TransferMode.BLOCK) {
267
268
269 if (buf.readableBytes() < 3) {
270
271
272
273 return;
274 }
275
276
277
278
279
280
281
282 buf.markReaderIndex();
283
284 if (dataBlock == null) {
285 dataBlock = new DataBlock();
286 }
287
288 dataBlock.setDescriptor(buf.readByte());
289
290
291 byte upper = buf.readByte();
292 byte lower = buf.readByte();
293 dataBlock.setByteCount(upper, lower);
294
295
296 if (buf.readableBytes() < dataBlock.getByteCount()) {
297
298
299
300
301
302
303 buf.resetReaderIndex();
304
305 return;
306 }
307 if (dataBlock.getByteCount() > 0) {
308
309 dataBlock.setBlock(buf.readBytes(dataBlock.getByteCount()));
310 }
311 DataBlock returnDataBlock = dataBlock;
312
313 dataBlock = null;
314
315 out.add(returnDataBlock);
316 return;
317 }
318
319 throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
320 }
321
322 protected ByteBuf encodeRecordStandard(DataBlock msg, ByteBuf buffer) {
323 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(msg.getByteCount());
324 int newbyte = 0;
325 try {
326 while (true) {
327 newbyte = buffer.readByte();
328 if (newbyte == 0xFF) {
329 newbuf.writeByte((byte) (newbyte & 0xFF));
330 }
331 newbuf.writeByte((byte) (newbyte & 0xFF));
332 }
333 } catch (IndexOutOfBoundsException e) {
334
335 }
336 int value = 0;
337 if (msg.isEOF()) {
338 value += 2;
339 }
340 if (msg.isEOR()) {
341 value += 1;
342 }
343 if (value > 0) {
344 newbuf.writeByte((byte) 0xFF);
345 newbuf.writeByte((byte) (value & 0xFF));
346 }
347 msg.clear();
348 return newbuf;
349 }
350
351 protected ByteBuf encodeRecord(DataBlock msg, ByteBuf buffer) {
352 FtpSeekAheadData sad = null;
353 try {
354 sad = new FtpSeekAheadData(buffer);
355 } catch (SeekAheadNoBackArrayException e1) {
356 return encodeRecordStandard(msg, buffer);
357 }
358 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(msg.getByteCount());
359 int newbyte = 0;
360 try {
361 while (sad.pos < sad.limit) {
362 newbyte = sad.bytes[sad.pos++];
363 if (newbyte == 0xFF) {
364 newbuf.writeByte((byte) (newbyte & 0xFF));
365 }
366 newbuf.writeByte((byte) (newbyte & 0xFF));
367 }
368 } catch (IndexOutOfBoundsException e) {
369
370 }
371 int value = 0;
372 if (msg.isEOF()) {
373 value += 2;
374 }
375 if (msg.isEOR()) {
376 value += 1;
377 }
378 if (value > 0) {
379 newbuf.writeByte((byte) 0xFF);
380 newbuf.writeByte((byte) (value & 0xFF));
381 }
382 msg.clear();
383 sad.setReadPosition(0);
384 return newbuf;
385 }
386
387
388
389
390
391
392
393
394 protected ByteBuf encode(DataBlock msg)
395 throws InvalidArgumentException {
396 if (msg.isCleared()) {
397 return null;
398 }
399 ByteBuf buffer = msg.getBlock();
400 if (mode == TransferMode.STREAM) {
401
402 if (structure == TransferStructure.RECORD) {
403 return encodeRecord(msg, buffer);
404 }
405 msg.clear();
406 return buffer;
407 } else if (mode == TransferMode.BLOCK) {
408 int length = msg.getByteCount();
409 ByteBuf newbuf = ByteBufAllocator.DEFAULT.buffer(length > 0xFFFF ? 0xFFFF + 3 : length + 3);
410 byte[] header = new byte[3];
411
412 if (length == 0) {
413
414 if (msg.isEOF() || msg.isEOR()) {
415 header[0] = msg.getDescriptor();
416 header[1] = 0;
417 header[2] = 0;
418 newbuf.writeBytes(header);
419
420 msg.clear();
421
422 return newbuf;
423 }
424
425 msg.clear();
426
427 return null;
428 }
429
430 if (msg.isRESTART()) {
431 header[0] = msg.getDescriptor();
432 header[1] = msg.getByteCountUpper();
433 header[2] = msg.getByteCountLower();
434 newbuf.writeBytes(header);
435 newbuf.writeBytes(msg.getByteMarkers());
436
437 msg.clear();
438
439 return newbuf;
440 }
441
442
443 if (length > 0xFFFF) {
444 header[0] = 0;
445 header[1] = (byte) 0xFF;
446 header[2] = (byte) 0xFF;
447 newbuf.writeBytes(header);
448
449 newbuf.writeBytes(msg.getBlock(), 0xFFFF);
450 length -= 0xFFFF;
451 msg.setByteCount(length);
452
453 return newbuf;
454 }
455
456 header[0] = msg.getDescriptor();
457 header[1] = msg.getByteCountUpper();
458 header[2] = msg.getByteCountLower();
459 newbuf.writeBytes(header);
460
461 newbuf.writeBytes(buffer, length);
462
463 msg.clear();
464
465 return newbuf;
466 }
467
468 throw new InvalidArgumentException("Mode unimplemented: " + mode.name());
469 }
470
471
472
473
474 public TransferMode getMode() {
475 return mode;
476 }
477
478
479
480
481
482 public void setMode(TransferMode mode) {
483 this.mode = mode;
484 }
485
486
487
488
489 public TransferStructure getStructure() {
490 return structure;
491 }
492
493
494
495
496
497 public void setStructure(TransferStructure structure) {
498 this.structure = structure;
499 }
500
501 @Override
502 protected void encode(ChannelHandlerContext ctx, DataBlock msg, ByteBuf out) throws Exception {
503
504
505
506 if (!isReady) {
507 if (!codecLocked.await(FtpConfiguration.getDATATIMEOUTCON())) {
508 throw new InvalidArgumentException("Codec not unlocked while should be");
509 }
510 isReady = true;
511 }
512 ByteBuf next = encode(msg);
513
514 while (next != null) {
515 out.writeBytes(next);
516 next = encode(msg);
517 }
518 }
519 }