在项目中,遇到这么一个需求,就是DWS数据层数据最后的存储保存在Elastaticsearch中,对于流数据,官方提供了ElastaticsearchSinkFunction,但是对于批处理数据似乎没有提供这个ElastaticsearchOutputFormat这个直接的类,说白了,还是要我们自己去定义,所以也只好老夫自己动动手,不过我居然又找到这个程序,就不需要老夫动手了,老夫动手也写不出来这么好的。

我们可以看下官网对于自定义的输出的OutputFormat的说明,好像没啥说明啊,这里老夫也不贴全部的程序了,就只贴一个用到的ElastaticOutputFormat.java的代码,其他的程序在GitHub上,可自行查看。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package com.sun.api.flink.sink.es;

import com.google.common.base.Preconditions;
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;

import org.apache.flink.configuration.Configuration;
import org.apache.http.HttpHost;
import org.apache.log4j.Logger;
import org.elasticsearch.action.DocWriteRequest;
import org.elasticsearch.client.RestHighLevelClient;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ElasticsearchOutputFormat<T> extends AbstractElasticsearchOutputFormat<T, RestHighLevelClient> {

private static final Logger LOG = Logger.getLogger(ElasticsearchOutputFormat.class);
private static final long serialVersionUID = 1L;

private ElasticsearchOutputFormat(
Map<String, String> bulkRequestsConfig,
List<HttpHost> httpHosts,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
DocWriteRequestFailureHandler failureHandler,
RestClientFactory restClientFactory) {
super(new Elasticsearch6ApiCallBridge(httpHosts, restClientFactory), bulkRequestsConfig, elasticsearchSinkFunction, failureHandler);
}

@Override
public void configure(Configuration configuration) {
// do nothing here
}

@PublicEvolving
public static class Builder<T> {

private final List<HttpHost> httpHosts;
private final ElasticsearchSinkFunction<T> elasticsearchSinkFunction;

private Map<String, String> bulkRequestsConfig = new HashMap<>();
private DocWriteRequestFailureHandler failureHandler = new NoOpFailureHandler();
private RestClientFactory restClientFactory = restClientBuilder -> {
};


public Builder(List<HttpHost> httpHosts, ElasticsearchSinkFunction<T> elasticsearchSinkFunction) {
this.httpHosts = Preconditions.checkNotNull(httpHosts);
this.elasticsearchSinkFunction = Preconditions.checkNotNull(elasticsearchSinkFunction);
}

public Builder setBulkFlushMaxActions(int numMaxActions) {
Preconditions.checkArgument(
numMaxActions > 0,
"Max number of buffered actions must be larger than 0.");
LOG.info("Builder config: bulkFlushMaxActions=" + numMaxActions);
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, String.valueOf(numMaxActions));
return this;
}

public Builder setBulkFlushMaxSizeMb(int maxSizeMb) {
Preconditions.checkArgument(
maxSizeMb > 0,
"Max size of buffered actions must be larger than 0.");
LOG.info("Builder config: bulkFlushMaxSizeMb=" + maxSizeMb);
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB, String.valueOf(maxSizeMb));
return this;
}

public Builder setBulkFlushInterval(long intervalMillis) {
Preconditions.checkArgument(
intervalMillis >= 0,
"Interval (in milliseconds) between each flush must be larger than or equal to 0.");
LOG.info("Builder config: bulkFlushInterval=" + intervalMillis);
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS, String.valueOf(intervalMillis));
return this;
}

public Builder setBulkFlushBackoff(boolean enabled) {
LOG.info("Builder config: bulkFlushBackoff=" + enabled);
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, String.valueOf(enabled));
return this;
}

public Builder setBulkFlushBackoffType(ElasticsearchApiCallBridge.FlushBackoffType flushBackoffType) {
LOG.info("Builder config: bulkFlushBackoffType=" + flushBackoffType);
this.bulkRequestsConfig.put(
CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE,
Preconditions.checkNotNull(flushBackoffType).toString());
return this;
}

public Builder setBulkFlushBackoffRetries(int maxRetries) {
Preconditions.checkArgument(
maxRetries > 0,
"Max number of backoff attempts must be larger than 0.");
LOG.info("Builder config: bulkFlushBackoffRetries=" + maxRetries);
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES, String.valueOf(maxRetries));
return this;
}

public Builder setBulkFlushBackoffDelay(long delayMillis) {
Preconditions.checkArgument(
delayMillis >= 0,
"Delay (in milliseconds) between each backoff attempt must be larger than or equal to 0.");
LOG.info("Builder config: bulkFlushBackoffDelay=" + delayMillis);
this.bulkRequestsConfig.put(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY, String.valueOf(delayMillis));
return this;
}

public Builder setFailureHandler(DocWriteRequestFailureHandler failureHandler) {
LOG.info("Builder config: failureHandler=" + failureHandler);
this.failureHandler = Preconditions.checkNotNull(failureHandler);
return this;
}

public Builder setRestClientFactory(RestClientFactory restClientFactory) {
LOG.info("Builder config: restClientFactory=" + restClientFactory);
this.restClientFactory = Preconditions.checkNotNull(restClientFactory);
return this;
}

public ElasticsearchOutputFormat<T> build() {
return new ElasticsearchOutputFormat<>(bulkRequestsConfig, httpHosts, elasticsearchSinkFunction, failureHandler, restClientFactory);
}
}
@Internal
private static class NoOpFailureHandler implements DocWriteRequestFailureHandler {
private static final long serialVersionUID = 737941343410827885L;
@Override
public void onFailure(DocWriteRequest action, Throwable failure, int restStatusCode, RequestIndexer indexer) throws Throwable {
throw failure;
}

}
}