1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 package org.waarp.openr66.protocol.localhandler.packet;
21
22 import io.netty.buffer.ByteBuf;
23 import io.netty.buffer.ByteBufAllocator;
24 import org.waarp.common.digest.FilesystemBasedDigest;
25 import org.waarp.common.logging.WaarpLogger;
26 import org.waarp.common.logging.WaarpLoggerFactory;
27 import org.waarp.common.utility.ParametersChecker;
28 import org.waarp.common.utility.WaarpNettyUtil;
29 import org.waarp.openr66.context.R66Session;
30 import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
31 import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
32 import org.waarp.openr66.protocol.utils.FileUtils;
33
34 import java.util.Arrays;
35
36
37
38
39
40
41 public class DataPacket extends AbstractLocalPacket {
42 private static final WaarpLogger logger =
43 WaarpLoggerFactory.getLogger(DataPacket.class);
44
45 private final int packetRank;
46
47 private int lengthPacket;
48
49 private byte[] data;
50 private ByteBuf dataRecv;
51
52 private byte[] key;
53
54
55
56
57
58
59
60
61
62
63
64 public static DataPacket createFromBuffer(final int headerLength,
65 final int middleLength,
66 final int endLength,
67 final ByteBuf buf)
68 throws OpenR66ProtocolPacketException {
69 if (headerLength - 1 <= 0) {
70 throw new OpenR66ProtocolPacketException("Not enough data");
71 }
72 if (middleLength <= 0) {
73 throw new OpenR66ProtocolPacketException("Not enough data");
74 }
75 final int packetRank = buf.readInt();
76 final int index = buf.readerIndex();
77 final ByteBuf recvData = buf.retainedSlice(index, middleLength);
78 buf.skipBytes(middleLength);
79 final byte[] key = endLength > 0? new byte[endLength] : EMPTY_ARRAY;
80 if (endLength > 0) {
81 buf.readBytes(key);
82 }
83 return new DataPacket(packetRank, recvData, key);
84 }
85
86
87
88
89
90
91 private DataPacket(final int packetRank, final ByteBuf data,
92 final byte[] key) {
93 this.packetRank = packetRank;
94 this.dataRecv = data;
95 this.data = null;
96 this.key = key == null? EMPTY_ARRAY : key;
97 lengthPacket = dataRecv.readableBytes();
98 }
99
100
101
102
103
104
105
106 public DataPacket(final int packetRank, final byte[] data, final int length,
107 final byte[] key) {
108 this.packetRank = packetRank;
109 this.data = data;
110 this.dataRecv = null;
111 this.key = key == null? EMPTY_ARRAY : key;
112 lengthPacket = length;
113 }
114
115
116
117
118
119
120
121 public final void updateFromCompressionCodec(final byte[] data,
122 final int length) {
123 this.data = data;
124 lengthPacket = length;
125 }
126
127 @Override
128 public final boolean hasGlobalBuffer() {
129 return false;
130 }
131
132 @Override
133 public final void createAllBuffers(final LocalChannelReference lcr,
134 final int networkHeader) {
135 throw new IllegalStateException("Should not be called");
136 }
137
138 @Override
139 public final synchronized void createEnd(final LocalChannelReference lcr) {
140 end = WaarpNettyUtil.wrappedBuffer(key);
141 }
142
143 @Override
144 public final synchronized void createHeader(final LocalChannelReference lcr) {
145 header = ByteBufAllocator.DEFAULT.ioBuffer(4, 4);
146 header.writeInt(packetRank);
147 }
148
149 @Override
150 public final synchronized void createMiddle(final LocalChannelReference lcr) {
151 if (dataRecv != null) {
152 middle = dataRecv;
153 } else {
154 middle = WaarpNettyUtil.wrappedBuffer(data);
155 middle.writerIndex(lengthPacket);
156 }
157 }
158
159 @Override
160 public final byte getType() {
161 return LocalPacketFactory.DATAPACKET;
162 }
163
164 @Override
165 public final String toString() {
166 return "DataPacket: " + packetRank + ':' + lengthPacket;
167 }
168
169
170
171
172 public final int getPacketRank() {
173 return packetRank;
174 }
175
176
177
178
179 public final int getLengthPacket() {
180 return lengthPacket;
181 }
182
183
184
185
186
187
188 public final synchronized void createByteBufFromRecv(
189 final R66Session session) {
190
191 if (data == null) {
192 final byte[] buffer = session.getReusableDataPacketBuffer(lengthPacket);
193 dataRecv.getBytes(dataRecv.readerIndex(), buffer, 0, lengthPacket);
194 data = buffer;
195 WaarpNettyUtil.release(dataRecv);
196 dataRecv = null;
197 }
198 }
199
200
201
202
203 public final byte[] getData() {
204 ParametersChecker.checkParameter("Data is not setup correctly", data,
205 logger);
206 return data;
207 }
208
209
210
211
212 public final byte[] getKey() {
213 return key;
214 }
215
216
217
218
219 public final boolean isKeyValid(final FilesystemBasedDigest digestBlock,
220 final FilesystemBasedDigest digestGlobal,
221 final FilesystemBasedDigest digestLocal) {
222 ParametersChecker.checkParameter("Data is not setup correctly", data,
223 logger);
224 if (key == null || key.length == 0) {
225 if (digestGlobal != null || digestLocal != null) {
226 FileUtils.computeGlobalHash(digestGlobal, digestLocal, data,
227 lengthPacket);
228 }
229 logger.error("Should received a Digest but don't");
230 return false;
231 }
232 digestBlock.Update(data, 0, lengthPacket);
233 final byte[] newkey = digestBlock.Final();
234 FileUtils.computeGlobalHash(digestGlobal, digestLocal, data, lengthPacket);
235 final boolean equal = Arrays.equals(key, newkey);
236 if (!equal) {
237 logger.error("DIGEST {} != {} for {} bytes using {} at rank {}",
238 FilesystemBasedDigest.getHex(key),
239 FilesystemBasedDigest.getHex(newkey), lengthPacket,
240 digestBlock.getAlgo(), packetRank);
241 } else if (logger.isDebugEnabled()) {
242 logger.debug("DIGEST {} == {} for {} bytes using {} at rank {}",
243 FilesystemBasedDigest.getHex(key),
244 FilesystemBasedDigest.getHex(newkey), lengthPacket,
245 digestBlock.getAlgo(), packetRank);
246 }
247 return equal;
248 }
249
250 @Override
251 public final synchronized void clear() {
252 super.clear();
253 WaarpNettyUtil.release(dataRecv);
254 dataRecv = null;
255 data = null;
256 key = null;
257 lengthPacket = 0;
258 }
259 }