DataPacket.java
/*
* This file is part of Waarp Project (named also Waarp or GG).
*
* Copyright (c) 2019, Waarp SAS, and individual contributors by the @author
* tags. See the COPYRIGHT.txt in the distribution for a full listing of
* individual contributors.
*
* All Waarp Project is free software: you can redistribute it and/or
* modify it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or (at your
* option) any later version.
*
* Waarp is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR
* A PARTICULAR PURPOSE. See the GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License along with
* Waarp . If not, see <http://www.gnu.org/licenses/>.
*/
package org.waarp.openr66.protocol.localhandler.packet;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import org.waarp.common.digest.FilesystemBasedDigest;
import org.waarp.common.logging.WaarpLogger;
import org.waarp.common.logging.WaarpLoggerFactory;
import org.waarp.common.utility.ParametersChecker;
import org.waarp.common.utility.WaarpNettyUtil;
import org.waarp.openr66.context.R66Session;
import org.waarp.openr66.protocol.exception.OpenR66ProtocolPacketException;
import org.waarp.openr66.protocol.localhandler.LocalChannelReference;
import org.waarp.openr66.protocol.utils.FileUtils;
import java.util.Arrays;
/**
* Data packet
* <p>
* header = packetRank middle = data end = key
*/
public class DataPacket extends AbstractLocalPacket {
private static final WaarpLogger logger =
WaarpLoggerFactory.getLogger(DataPacket.class);
private final int packetRank;
private int lengthPacket;
private byte[] data;
private ByteBuf dataRecv;
private byte[] key;
/**
* @param headerLength
* @param middleLength
* @param endLength
* @param buf
*
* @return the new DataPacket from buffer
*
* @throws OpenR66ProtocolPacketException
*/
public static DataPacket createFromBuffer(final int headerLength,
final int middleLength,
final int endLength,
final ByteBuf buf)
throws OpenR66ProtocolPacketException {
if (headerLength - 1 <= 0) {
throw new OpenR66ProtocolPacketException("Not enough data");
}
if (middleLength <= 0) {
throw new OpenR66ProtocolPacketException("Not enough data");
}
final int packetRank = buf.readInt();
final int index = buf.readerIndex();
final ByteBuf recvData = buf.retainedSlice(index, middleLength);
buf.skipBytes(middleLength);
final byte[] key = endLength > 0? new byte[endLength] : EMPTY_ARRAY;
if (endLength > 0) {
buf.readBytes(key);
}
return new DataPacket(packetRank, recvData, key);
}
/**
* @param packetRank
* @param data
* @param key
*/
private DataPacket(final int packetRank, final ByteBuf data,
final byte[] key) {
this.packetRank = packetRank;
this.dataRecv = data;
this.data = null;
this.key = key == null? EMPTY_ARRAY : key;
lengthPacket = dataRecv.readableBytes();
}
/**
* @param packetRank
* @param data
* @param length
* @param key
*/
public DataPacket(final int packetRank, final byte[] data, final int length,
final byte[] key) {
this.packetRank = packetRank;
this.data = data;
this.dataRecv = null;
this.key = key == null? EMPTY_ARRAY : key;
lengthPacket = length;
}
/**
* When using compression/decompression, data can be changed
*
* @param data
* @param length
*/
public final void updateFromCompressionCodec(final byte[] data,
final int length) {
this.data = data;
lengthPacket = length;
}
@Override
public final boolean hasGlobalBuffer() {
return false;
}
@Override
public final void createAllBuffers(final LocalChannelReference lcr,
final int networkHeader) {
throw new IllegalStateException("Should not be called");
}
@Override
public final synchronized void createEnd(final LocalChannelReference lcr) {
end = WaarpNettyUtil.wrappedBuffer(key);
}
@Override
public final synchronized void createHeader(final LocalChannelReference lcr) {
header = ByteBufAllocator.DEFAULT.ioBuffer(4, 4);
header.writeInt(packetRank);
}
@Override
public final synchronized void createMiddle(final LocalChannelReference lcr) {
if (dataRecv != null) {
middle = dataRecv;
} else {
middle = WaarpNettyUtil.wrappedBuffer(data);
middle.writerIndex(lengthPacket);
}
}
@Override
public final byte getType() {
return LocalPacketFactory.DATAPACKET;
}
@Override
public final String toString() {
return "DataPacket: " + packetRank + ':' + lengthPacket;
}
/**
* @return the packetRank
*/
public final int getPacketRank() {
return packetRank;
}
/**
* @return the lengthPacket
*/
public final int getLengthPacket() {
return lengthPacket;
}
/**
* Transform the DataPacket to have a byte array instead of ByteBuf
*
* @param session to allow to get reusable buffer
*/
public final synchronized void createByteBufFromRecv(
final R66Session session) {
// Get reusable buffer and set internal content to byte Array
if (data == null) {
final byte[] buffer = session.getReusableDataPacketBuffer(lengthPacket);
dataRecv.getBytes(dataRecv.readerIndex(), buffer, 0, lengthPacket);
data = buffer;
WaarpNettyUtil.release(dataRecv);
dataRecv = null;
}
}
/**
* @return the data
*/
public final byte[] getData() {
ParametersChecker.checkParameter("Data is not setup correctly", data,
logger);
return data;
}
/**
* @return the key
*/
public final byte[] getKey() {
return key;
}
/**
* @return True if the Hashed key is valid (or no key is set)
*/
public final boolean isKeyValid(final FilesystemBasedDigest digestBlock,
final FilesystemBasedDigest digestGlobal,
final FilesystemBasedDigest digestLocal) {
ParametersChecker.checkParameter("Data is not setup correctly", data,
logger);
if (key == null || key.length == 0) {
if (digestGlobal != null || digestLocal != null) {
FileUtils.computeGlobalHash(digestGlobal, digestLocal, data,
lengthPacket);
}
logger.error("Should received a Digest but don't");
return false;
}
digestBlock.Update(data, 0, lengthPacket);
final byte[] newkey = digestBlock.Final();
FileUtils.computeGlobalHash(digestGlobal, digestLocal, data, lengthPacket);
final boolean equal = Arrays.equals(key, newkey);
if (!equal) {
logger.error("DIGEST {} != {} for {} bytes using {} at rank {}",
FilesystemBasedDigest.getHex(key),
FilesystemBasedDigest.getHex(newkey), lengthPacket,
digestBlock.getAlgo(), packetRank);
} else if (logger.isDebugEnabled()) {
logger.debug("DIGEST {} == {} for {} bytes using {} at rank {}",
FilesystemBasedDigest.getHex(key),
FilesystemBasedDigest.getHex(newkey), lengthPacket,
digestBlock.getAlgo(), packetRank);
}
return equal;
}
@Override
public final synchronized void clear() {
super.clear();
WaarpNettyUtil.release(dataRecv);
dataRecv = null;
data = null;
key = null;
lengthPacket = 0;
}
}