网易乐得技术团队

Flume写入elasticsearch研究

Flume写入elasticsearch时的版本问题

概述

目前技术团队中使用的数据收集工具是flume-ng,主要进行了如下几种场景的使用:
1).日志—>Flume—>实时计算(Storm、SparkStreaming)
2).日志—>Flume—>离线存储(如HIVE、HDFS、HBase)—>离线计算(Spark, spark-mllib)
3).日志—>Flume—>ElasticSearch
其中收集到数据之后在使用flume往elasticsearch写数据时,遇到了版本不兼容的问题,其中flume版本为1.6,elasticsearch版本为2.3。本博客主要记录版本不兼容问题的发现与解决过程。

问题一描述

  如题,再进行flume写数据到elasticsearch时,在flume启动时总是报如下错误

1
2
3
4
5
6
7
8
9
10
(org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run:251) - Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@33fbabbd counterGroup:{ name:null counters:{} } }
- Exception follows.
java.lang.NoSuchMethodError:
org.elasticsearch.common.transport.InetSocketTransportAddress.<init>(Ljava/lang/String;I)V
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.configureHostnames(ElasticSearchTransportClient.java:141)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchTransportClient.<init>(ElasticSearchTransportClient.java:77)
at org.apache.flume.sink.elasticsearch.client.ElasticSearchClientFactory.getClient(ElasticSearchClientFactory.java:48)
at org.apache.flume.sink.elasticsearch.ElasticSearchSink.start(ElasticSearchSink.java:358)
at org.apache.flume.sink.DefaultSinkProcessor.start(DefaultSinkProcessor.java:45)
at org.apache.flume.SinkRunner.start(SinkRunner.java:79)

  直接拿异常信息 Unable to start SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@33fbabbd counterGroup:{ name:null counters:{} } }上网查找,查到的了出现异常的原因:elasticsearch2.3版本太高,目前flume的官方版本中还没有对elasticsearch2.0以上的版本进行兼容。

问题一解决

  查出来问题所在,最简单的解决办法就是将elasticsearch的版本降低,降到1.0以下的版本(ps:elasticsearch目前的版本有0.90.0之后就是2.0以上)。但该种方法并不推荐,为了使用高版本es,可以进行以下调整:复写flume-ng-elasticsearch-sink.jar的代码。

根据异常信息栈,定位到问题所在的代码附近:ElasticSearchTransportClient.java

1
2
3
4
5
6
7
8
9
10
11
private void configureHostnames(String[] hostNames) {
logger.warn(Arrays.toString(hostNames));
serverAddresses = new InetSocketTransportAddress[hostNames.length];
for (int i = 0; i < hostNames.length; i++) {
String[] hostPort = hostNames[i].trim().split(":");
String host = hostPort[0].trim();
int port = hostPort.length == 2 ? Integer.parseInt(hostPort[1].trim())
: DEFAULT_PORT;
serverAddresses[i] = new InetSocketTransportAddress(host, port);
}
}

  问题出现在new InetSocketTransportAddress(host, port)这个构造方法。代码在这行编译错误。InetSocketTransportAddress是elasticsearch的jar包中的类,下载elasticsearch-2.3.jar后,发现InetSocketTransportAddress中确实没有InetSocketTransportAddress(String, int)类型的构造函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public InetSocketTransportAddress(StreamInput in) throws IOException {
final int len = in.readByte();
final byte[] a = new byte[len]; // 4 bytes (IPv4) or 16 bytes (IPv6)
in.readFully(a);
InetAddress inetAddress = InetAddress.getByAddress(a);
int port = in.readInt();
this.address = new InetSocketAddress(inetAddress, port);
}
private InetSocketTransportAddress() {
address = null;
}
public InetSocketTransportAddress(InetAddress address, int port) {
this(new InetSocketAddress(address, port));
}
public InetSocketTransportAddress(InetSocketAddress address) {
if (address == null) {
throw new IllegalArgumentException("InetSocketAddress must not be null");
}
if (address.getAddress() == null) {
throw new IllegalArgumentException("Address must be resolved but wasn't - InetSocketAddress#getAddress() returned null");
}
this.address = address;
}

  但是为什么flume连接elasticsearch 0.90.0版本的时候没有这个问题呢。继续下载elasticsearch-0.90.0.jar之后,再次打开InetSocketTransportAddress.java 0.90.0版本的代码:

1
2
3
4
5
6
7
8
9
10
11
public InetSocketTransportAddress(String hostname, int port) {
this(new InetSocketAddress(hostname, port));
}
public InetSocketTransportAddress(InetAddress address, int port) {
this(new InetSocketAddress(address, port));
}
public InetSocketTransportAddress(InetSocketAddress address) {
this.address = address;
}

  可以看出在elasticsearch-0.90.0.jar版本中,elasticsearch-0.90.0.jar有三个构造方法,其中确实是有InetSocketTransportAddress(String, int)类型的构造函数。 最后的解决办法是,将flume-elasticsearch-sink.jar解压后,改写ElasticSearchTransportClient类中调用InetSocketTransportAddress构造方法的地方,再将代码重新打包成flume-elasticsearch-sink.jar上传到flume服务器的lib目录下。

1
2
3
4
5
6
7
8
9
10
11
12
private void configureHostnames(String[] hostNames) {
logger.warn(Arrays.toString(hostNames));
this.serverAddresses = new InetSocketTransportAddress[hostNames.length];
for(int i = 0; i < hostNames.length; ++i) {
String[] hostPort = hostNames[i].trim().split(":");
String host = hostPort[0].trim();
int port = hostPort.length == 2?Integer.parseInt(hostPort[1].trim()):9300;
this.serverAddresses[i] = new InetSocketTransportAddress(new InetSocketAddress(host, port));
}
}

问题二描述

  解决了上述问题后,flume就能顺利启动了。然后开始测试数据写入es。此时又遇到另一个由于es版本照成的异常:

source[{"body":{"datagramId":"a__0_test","communicationId":"123456","bizType":2},"protocol":"json","persisttype":"elasticsearch,kafka","flume.client.log4j.timestamp":"1484297513292","flume.client.log4j.logger.name":"RobertLogger","flume.client.log4j.log.level":"20000","flume.client.log4j.message.encoding":"UTF8","topic":"datagram","timestamp":"1484297513372"}]}
MapperParsingException[Field name [flume.client.log4j.log.level] cannot contain '.']
        at org.elasticsearch.index.mapper.object.ObjectMapper$TypeParser.parseProperties(ObjectMapper.java:273)

问题二解决

  首先es中的几个概念颗粒理解如下;
    Relational DB -> Databases -> Tables -> Rows -> Columns
    Elasticsearch -> Indices(Index) -> Types -> Documents -> Fields
  如异常中的提示信息,Fields的名字不能带有”.”,但是上述的带有”.”的files都是log4j往flume写消息时,在event的header中添加的信息。
  查看异常中的ObjectMapper.java类(elasticsearch-2.3.3.jar中)的代码 :

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected static void parseProperties(ObjectMapper.Builder objBuilder, Map<String, Object> propsNode, ParserContext parserContext) {
Iterator<Map.Entry<String, Object>> iterator = propsNode.entrySet().iterator();
while (iterator.hasNext()) {
Map.Entry<String, Object> entry = iterator.next();
String fieldName = entry.getKey();
//fieldName不能包含"."的判断逻辑
if (fieldName.contains(".")) {
throw new MapperParsingException("Field name [" + fieldName + "] cannot contain '.'");
}
........
}
}

  可以看出解析file时确实有对是否带有”.”进行校验的逻辑.
而在ObjectMapper.java类(elasticsearch-0.90.0.jar中)是没有这方面的逻辑校验的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void parseProperties(ObjectMapper.Builder objBuilder, Map<String, Object> propsNode, ParserContext parserContext) {
for (Map.Entry<String, Object> entry : propsNode.entrySet()) {
String propName = entry.getKey();
Map<String, Object> propNode = (Map<String, Object>) entry.getValue();
String type;
Object typeNode = propNode.get("type");
if (typeNode != null) {
type = typeNode.toString();
} else {
// lets see if we can derive this...
if (propNode.get("properties") != null) {
type = ObjectMapper.CONTENT_TYPE;
} else if (propNode.get("fields") != null) {
type = MultiFieldMapper.CONTENT_TYPE;
} else if (propNode.size() == 1 && propNode.get("enabled") != null) {
// if there is a single property with the enabled flag on it, make it an object
// (usually, setting enabled to false to not index any type, including core values, which
// non enabled object type supports).
type = ObjectMapper.CONTENT_TYPE;
} else {
throw new MapperParsingException("No type specified for property [" + propName + "]");
}
}
Mapper.TypeParser typeParser = parserContext.typeParser(type);
if (typeParser == null) {
throw new MapperParsingException("No handler for type [" + type + "] declared on field [" + propName + "]");
}
objBuilder.add(typeParser.parse(propName, propNode, parserContext));
}
}

此时elasticsearch-2.3.3.jar中的代码时无法修改的,所以只能采取如下两种方法解决:

方法一:log4j向flume发送消息时不发送默认的如下默认自带的header信息或修改header的name。

需要复写log4j:log4j.jar包中的类,修改header中的name的名字。

1
2
3
4
5
6
7
8
9
public enum Log4jAvroHeaders {
OTHER("flume.client.log4j.logger.other"),
LOGGER_NAME("flume.client.log4j.logger.name"),
LOG_LEVEL("flume.client.log4j.log.level"),
MESSAGE_ENCODING("flume.client.log4j.message.encoding"),
TIMESTAMP("flume.client.log4j.timestamp"),
AVRO_SCHEMA_LITERAL("flume.avro.schema.literal"),
AVRO_SCHEMA_URL("flume.avro.schema.url");
}

方法二:改写flume-elasticsearch-sink.jar的代码,flume向es写信息时,只写event.body的信息,不写event.header的信息。

将flume-elasticsearch-sink.jar解压后,在ElasticSearchDynamicSerializer.java和ElasticSearchLogStashEventSerializer.java中增加是否发送header的开关,开关的值在flume配置文件中设置。 或者直接将this.appendHeaders(builder, event)代码注释。再将代码重新打包成flume-elasticsearch-sink.jar上传到flume服务器的lib目录下。

1
2
3
4
5
6
7
8
9
public XContentBuilder getContentBuilder(Event event) throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject();
this.appendBody(builder, event);
//从flume配置文件中读取writeheader的值,未配置该项时默认为flase,即此时不将flume的header内容发送到es
if(context.getString("writeheader", "false")){
this.appendHeaders(builder, event);
}
return builder;
}

此次在Robeter中采用的是方法二,因为根据业务,需要建立索引的字段都在event.body中,不用将header的信息也在es中建立索引。

附:

1.flume-conf.properties配置:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
rec.sources = recSource
rec.channels = esChannel
rec.sinks = esSink
rec.sources.recSource.type = avro
rec.sources.recSource.bind = 0.0.0.0
rec.sources.recSource.port = 41414
rec.sources.recSource.channels = esChannel
rec.sources.recSource.selector.type = multiplexing
#elasticsearch
rec.channels.esChannel.type=memory
rec.channels.esChannel.capacity=10000
rec.channels.esChannel.transactionCapacity=1000
rec.channels.esChannel.keep-alive=30
rec.sinks.esSink.channel = esChannel
rec.sinks.esSink.type =ElasticSearchSink
rec.sinks.esSink.hostNames = xxx.xxx.xxx.xxx:9300
rec.sinks.esSink.indexName = robert
rec.sinks.esSink.batchSize = 100
rec.sinks.esSink.indexType = %{topic}
rec.sinks.esSink.clusterName = test_search
rec.sinks.esSink.serializer = ElasticSearchDynamicSerializer