Moved to using XContentBuilder internal byte buffers instead of writing connection.outputStream to avoid flushing and chunked requests. Also added content type to dashboard and template uploading.

This commit is contained in:
Boaz Leskes 2013-12-25 14:26:27 +01:00
parent f4ac153336
commit 4b1bda9ea4

View file

@ -193,18 +193,19 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
OutputStream os = conn.getOutputStream();
// TODO: find a way to disable builder's substream flushing or something neat solution
for (int i = 0; i < renderer.length(); i++) {
XContentBuilder builder = XContentFactory.smileBuilder(os);
XContentBuilder builder = XContentFactory.smileBuilder();
builder.startObject().startObject("index")
.field("_index", getIndexName())
.field("_type", renderer.type(i))
.endObject().endObject();
builder.flush();
os.write(builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length());
os.write(SmileXContent.smileXContent.streamSeparator());
builder = XContentFactory.smileBuilder(os);
builder = XContentFactory.smileBuilder();
builder.humanReadable(false);
renderer.render(i, builder);
builder.flush();
builder.close();
os.write(builder.bytes().array(), builder.bytes().arrayOffset(), builder.bytes().length());
os.write(SmileXContent.smileXContent.streamSeparator());
}
}
@ -214,10 +215,12 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
OutputStream os = conn.getOutputStream();
os.close();
InputStream inputStream = conn.getInputStream();
while (inputStream.read() != -1) ;
inputStream.close();
if (conn.getResponseCode() != 200) {
logConnectionError("remote target didn't respond with 200 OK", conn);
} else {
conn.getInputStream().close(); // close and release to connection pool.
}
}
@ -407,7 +410,7 @@ public class ESExporter extends AbstractLifecycleComponent<ESExporter> implement
// nothing there, lets create it
if (!hasDoc) {
logger.debug("no document found in elasticsearch for [{}]. Adding...", path);
conn = openConnection("PUT", path);
conn = openConnection("PUT", path, XContentType.JSON.restContentType());
OutputStream os = conn.getOutputStream();
Streams.copy(bytes, os);
if (!(conn.getResponseCode() == 200 || conn.getResponseCode() == 201)) {