在对主从表进行数据写入时,出现数据未序列化提示,经过检查,是HBASE的连接配置写法问题
原写法,Configuration
配置在外部
// 配置HBase连接
Configuration hbaseConfig = HBaseConfiguration.create();
hbaseConfig.set("hbase.zookeeper.quorum", "192.168.1.160");
//hbaseConfig.set("hbase.zookeeper.property.clientPort", "2181");
// 连接到HBase
Connection connection = ConnectionFactory.createConnection(hbaseConfig);
Table mainTable = connection.getTable(TableName.valueOf("main_table"));
Table subTable = connection.getTable(TableName.valueOf("sub_table"));
// 写入子表
result.toJavaRDD().foreachPartition(partition -> {
System.err.println("!");
partition.forEachRemaining(row -> {
try {
String subTableId = mainTableId + "_" + row.getString(0); // mainTableId + MMSI
Put subPut = new Put(Bytes.toBytes(subTableId));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("main_id"), Bytes.toBytes(mainTableId));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("MMSI"), Bytes.toBytes(row.getString(0)));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("passage_count"), Bytes.toBytes(row.getLong(1)));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("total_distance"), Bytes.toBytes(row.getDouble(2)));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("total_time"), Bytes.toBytes(row.getLong(3)));
//subTable.put(subPut);
} catch (IOException e) {
e.printStackTrace();
}
});
});
修改后写法,Configuration
配置放入foreachPartition
中
// 写入子表
result.toJavaRDD().foreachPartition(partition -> {
Configuration hbaseConfig1 = HBaseConfiguration.create();
hbaseConfig1.set("hbase.zookeeper.quorum", "192.168.1.160");
// 在每个分区内初始化HBase连接
try (Connection connection = ConnectionFactory.createConnection(hbaseConfig1);
Table subTable = connection.getTable(TableName.valueOf("sub_table"))) {
partition.forEachRemaining(row -> {
try {
String subTableId = mainTableId + "_" + row.getString(0); // mainTableId + MMSI
Put subPut = new Put(Bytes.toBytes(subTableId));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("main_id"), Bytes.toBytes(mainTableId));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("MMSI"), Bytes.toBytes(row.getString(0)));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("passage_count"), Bytes.toBytes(row.getLong(1)));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("total_distance"), Bytes.toBytes(row.getDouble(2)));
subPut.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("total_time"), Bytes.toBytes(row.getLong(3)));
subTable.put(subPut);
} catch (IOException e) {
e.printStackTrace();
}
});
} catch (IOException e) {
e.printStackTrace();
}
});
文章评论