1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.waarp.openr66.protocol.monitoring;
22
23 import com.fasterxml.jackson.databind.node.ObjectNode;
24 import io.netty.bootstrap.Bootstrap;
25 import io.netty.buffer.ByteBuf;
26 import io.netty.buffer.Unpooled;
27 import io.netty.channel.Channel;
28 import io.netty.channel.ChannelFuture;
29 import io.netty.channel.EventLoopGroup;
30 import io.netty.handler.codec.http.DefaultHttpHeaders;
31 import io.netty.handler.codec.http.DefaultHttpRequest;
32 import io.netty.handler.codec.http.DefaultLastHttpContent;
33 import io.netty.handler.codec.http.HttpHeaderNames;
34 import io.netty.handler.codec.http.HttpHeaderValues;
35 import io.netty.handler.codec.http.HttpHeaders;
36 import io.netty.handler.codec.http.HttpMethod;
37 import io.netty.handler.codec.http.HttpRequest;
38 import io.netty.handler.codec.http.HttpVersion;
39 import io.netty.handler.ssl.SslContext;
40 import io.netty.handler.ssl.SslContextBuilder;
41 import io.netty.handler.ssl.util.InsecureTrustManagerFactory;
42 import io.netty.util.internal.SocketUtils;
43 import org.joda.time.DateTime;
44 import org.waarp.common.future.WaarpFuture;
45 import org.waarp.common.logging.WaarpLogger;
46 import org.waarp.common.logging.WaarpLoggerFactory;
47 import org.waarp.common.utility.WaarpNettyUtil;
48 import org.waarp.common.utility.WaarpStringUtils;
49 import org.waarp.openr66.protocol.configuration.Configuration;
50 import org.waarp.openr66.protocol.http.restv2.utils.JsonUtils;
51 import org.waarp.openr66.protocol.networkhandler.ssl.NetworkSslServerInitializer;
52
53 import javax.net.ssl.SSLException;
54 import java.io.Closeable;
55 import java.io.IOException;
56 import java.io.UnsupportedEncodingException;
57 import java.net.URI;
58 import java.net.URISyntaxException;
59
60 import static org.waarp.openr66.protocol.configuration.Configuration.*;
61
62
63
64
65 public class HttpMonitoringExporterClient implements Closeable {
66 private static final WaarpLogger logger =
67 WaarpLoggerFactory.getLogger(HttpMonitoringExporterClient.class);
68 public static final String HTTPS = "https";
69 public static final String BASIC = "Basic ";
70 public static final String BEARER = "Bearer ";
71 public static final String API_KEY = "ApiKey ";
72
73 private final URI finalUri;
74 private final String host;
75 private final boolean keepConnection;
76 private final Bootstrap bootstrap;
77 private int port;
78 private final String basicAuthent;
79 private final String token;
80 private final String apiKey;
81 private Channel remoteRestChannel = null;
82
83 private WaarpFuture futurePost = null;
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101 public HttpMonitoringExporterClient(final String remoteBaseUrl,
102 final String basicAuthent,
103 final String token, final String apiKey,
104 final String endpoint,
105 final boolean keepConnection,
106 final EventLoopGroup group) {
107 this.keepConnection = keepConnection;
108 this.token = token;
109 this.basicAuthent = basicAuthent;
110 this.apiKey = apiKey;
111 final String uri;
112 if (remoteBaseUrl.endsWith("/")) {
113 uri = remoteBaseUrl + endpoint;
114 } else {
115 uri = remoteBaseUrl + "/" + endpoint;
116 }
117
118 try {
119 finalUri = new URI(uri);
120 } catch (final URISyntaxException e) {
121 logger.error("URI syntax error: {}", e.getMessage());
122 throw new IllegalArgumentException(e);
123 }
124 final String scheme =
125 finalUri.getScheme() == null? "http" : finalUri.getScheme();
126 host = finalUri.getHost() == null? "127.0.0.1" : finalUri.getHost();
127 port = finalUri.getPort();
128 if (port == -1) {
129 if ("http".equalsIgnoreCase(scheme)) {
130 port = 80;
131 } else if (HTTPS.equalsIgnoreCase(scheme)) {
132 port = 443;
133 }
134 }
135
136 if (!"http".equalsIgnoreCase(scheme) && !HTTPS.equalsIgnoreCase(scheme)) {
137 logger.error("Only HTTP(S) is supported.");
138 throw new IllegalArgumentException("Only HTTP(S) is supported.");
139 }
140
141 final boolean ssl = HTTPS.equalsIgnoreCase(scheme);
142 final SslContext sslCtx;
143 if (ssl) {
144 try {
145 sslCtx = SslContextBuilder.forClient().keyManager(
146 NetworkSslServerInitializer.getWaarpSecureKeyStore()
147 .getKeyManagerFactory()).trustManager(
148 InsecureTrustManagerFactory.INSTANCE).build();
149 } catch (final SSLException e) {
150 logger.error("SslContext error", e);
151 throw new IllegalArgumentException(e);
152 }
153 } else {
154 sslCtx = null;
155 }
156
157
158 bootstrap = new Bootstrap();
159 WaarpNettyUtil.setBootstrap(bootstrap, group,
160 (int) Configuration.configuration.getTimeoutCon(),
161 configuration.getBlockSize() + 64, true);
162 bootstrap.handler(
163 new HttpMonitoringExporterClientInitializer(sslCtx, this));
164 }
165
166
167
168
169
170
171
172
173
174 public final boolean post(final ObjectNode monitoredTransfers,
175 final DateTime start, final DateTime stop,
176 final String serverId) {
177 logger.debug("Start Post from {} to {} as {}", start, stop, serverId);
178 if (keepConnection && remoteRestChannel != null &&
179 !remoteRestChannel.isActive()) {
180 remoteRestChannel.close();
181 remoteRestChannel = null;
182 }
183 if (remoteRestChannel == null) {
184 final ChannelFuture future =
185 bootstrap.connect(SocketUtils.socketAddress(host, port));
186 try {
187 remoteRestChannel = future.sync().channel();
188 } catch (final InterruptedException e) {
189 logger.error(e);
190 return false;
191 }
192 }
193 futurePost = new WaarpFuture(true);
194
195 final String body = JsonUtils.nodeToString(monitoredTransfers);
196 final int length;
197 final byte[] bbody;
198 try {
199 bbody = body.getBytes(WaarpStringUtils.UTF_8);
200 length = body.length();
201 } catch (final UnsupportedEncodingException e) {
202 logger.error(e.getMessage());
203 return false;
204 }
205 final ByteBuf buf = Unpooled.wrappedBuffer(bbody);
206 final HttpHeaders headers = new DefaultHttpHeaders(true);
207
208 headers.set(HttpHeaderNames.HOST, host);
209 if (keepConnection) {
210 headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.KEEP_ALIVE);
211 } else {
212 headers.set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE);
213 }
214 if (basicAuthent != null) {
215 headers.set(HttpHeaderNames.AUTHORIZATION, BASIC + basicAuthent);
216 } else if (token != null) {
217 headers.set(HttpHeaderNames.AUTHORIZATION, BEARER + token);
218 } else if (apiKey != null) {
219 headers.set(HttpHeaderNames.AUTHORIZATION, API_KEY + apiKey);
220 }
221 headers.set(MonitorExporterTransfers.HEADER_WAARP_ID, serverId);
222 headers.set(MonitorExporterTransfers.HEADER_WAARP_START,
223 start == null? "" : start.toString());
224 headers.set(MonitorExporterTransfers.HEADER_WAARP_STOP, stop.toString());
225 headers.set(HttpHeaderNames.CONTENT_LENGTH, length);
226 headers.set(HttpHeaderNames.CONTENT_TYPE,
227 HttpHeaderValues.APPLICATION_JSON);
228 final HttpRequest request =
229 new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST,
230 finalUri.toASCIIString(), headers);
231
232 logger.debug("Request from {} to {} as {} length {}", start, stop, serverId,
233 length);
234 logger.debug("{} {} {} {}", request.method(), request.uri(),
235 request.protocolVersion(), request.headers());
236 remoteRestChannel.write(request);
237 remoteRestChannel.write(buf);
238 remoteRestChannel.writeAndFlush(DefaultLastHttpContent.EMPTY_LAST_CONTENT)
239 .awaitUninterruptibly();
240 logger.debug("Ending Post from {} to {} as {}", start, stop, serverId);
241
242 if (!keepConnection) {
243 try {
244 logger.debug("Wait for Close connection");
245 remoteRestChannel.closeFuture().sync();
246 remoteRestChannel = null;
247 } catch (final InterruptedException e) {
248 logger.error(e);
249
250 }
251 }
252 futurePost.awaitOrInterruptible();
253 final boolean result = futurePost.isSuccess();
254 logger.info("End Post from {} to {} as {} with {}", start, stop, serverId,
255 result);
256 return result;
257 }
258
259 public final boolean isKeepConnection() {
260 return keepConnection;
261 }
262
263 public final void setStatus(final boolean ok) {
264 if (ok) {
265 futurePost.setSuccess();
266 } else {
267 futurePost.cancel();
268 }
269 }
270
271 @Override
272 public final void close() throws IOException {
273 if (remoteRestChannel != null && !remoteRestChannel.isActive()) {
274 remoteRestChannel.close();
275 remoteRestChannel = null;
276 }
277 }
278 }