Java学习者论坛

 找回密码
 立即注册

QQ登录

只需一步,快速开始

手机号码,快捷登录

恭喜Java学习者论坛(https://www.javaxxz.com)已经为数万Java学习者服务超过8年了!积累会员资料超过10000G+
成为本站VIP会员,下载本站10000G+会员资源,购买链接:点击进入购买VIP会员
JAVA高级面试进阶视频教程Java架构师系统进阶VIP课程

分布式高可用全栈开发微服务教程

Go语言视频零基础入门到精通

Java架构师3期(课件+源码)

Java开发全终端实战租房项目视频教程

SpringBoot2.X入门到高级使用教程

大数据培训第六期全套视频教程

深度学习(CNN RNN GAN)算法原理

Java亿级流量电商系统视频教程

互联网架构师视频教程

年薪50万Spark2.0从入门到精通

年薪50万!人工智能学习路线教程

年薪50万!大数据从入门到精通学习路线年薪50万!机器学习入门到精通视频教程
仿小米商城类app和小程序视频教程深度学习数据分析基础到实战最新黑马javaEE2.1就业课程从 0到JVM实战高手教程 MySQL入门到精通教程
查看: 264|回复: 0

[默认分类] 开发系列:01、使用Java和Maven开发Spark应用

[复制链接]
  • TA的每日心情
    开心
    2021-12-13 21:45
  • 签到天数: 15 天

    [LV.4]偶尔看看III

    发表于 2018-5-15 21:16:17 | 显示全部楼层 |阅读模式
    1、POM.xml

    1.   1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    2.   2     xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    3.   3     <modelVersion>4.0.0</modelVersion>
    4.   4
    5.   5     <groupId>org.hansight.spark</groupId>
    6.   6     <artifactId>examples</artifactId>
    7.   7     <version>0.0.1-SNAPSHOT</version>
    8.   8     <packaging>jar</packaging>
    9.   9
    10. 10     <name>examples</name>
    11. 11     <url>http://maven.apache.org</url>
    12. 12
    13. 13     <properties>
    14. 14         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    15. 15         <elasticsearch.version>1.2.1</elasticsearch.version>
    16. 16         <jdk.version>1.7</jdk.version>
    17. 17         <logback.version>1.1.2</logback.version>
    18. 18         <slf4j.version>1.7.7</slf4j.version>
    19. 19         <junit.version>4.11</junit.version>
    20. 20         <jcl.over.slf4j.version>1.7.7</jcl.over.slf4j.version>
    21. 21         <metrics.version>3.0.2</metrics.version>
    22. 22         <avro.version>1.7.7</avro.version>
    23. 23         <jna.version>4.1.0</jna.version>
    24. 24         <spark.version>1.0.2</spark.version>
    25. 25     </properties>
    26. 26     <dependencies>
    27. 27         <dependency>
    28. 28             <groupId>junit</groupId>
    29. 29             <artifactId>junit</artifactId>
    30. 30             <version>3.8.1</version>
    31. 31             <scope>test</scope>
    32. 32         </dependency>
    33. 33         <dependency>
    34. 34             <groupId>com.fasterxml.jackson.core</groupId>
    35. 35             <artifactId>jackson-core</artifactId>
    36. 36             <version>2.4.2</version>
    37. 37         </dependency>
    38. 38         <dependency>
    39. 39             <groupId>com.google.guava</groupId>
    40. 40             <artifactId>guava</artifactId>
    41. 41             <version>14.0.1</version>
    42. 42             <scope>provided</scope>
    43. 43         </dependency>
    44. 44         <dependency>
    45. 45             <groupId>org.apache.spark</groupId>
    46. 46             <artifactId>spark-streaming-kafka_2.10</artifactId>
    47. 47             <version>${spark.version}</version>
    48. 48             <exclusions>
    49. 49                 <exclusion>
    50. 50                     <groupId>javax.servlet</groupId>
    51. 51                     <artifactId>servlet-api</artifactId>
    52. 52                 </exclusion>
    53. 53                 <exclusion>
    54. 54                     <groupId>org.apache.hadoop</groupId>
    55. 55                     <artifactId>hadoop-client</artifactId>
    56. 56                 </exclusion>
    57. 57             </exclusions>
    58. 58         </dependency>
    59. 59         <dependency>
    60. 60             <groupId>org.elasticsearch</groupId>
    61. 61             <artifactId>elasticsearch</artifactId>
    62. 62             <version>1.2.1</version>
    63. 63         </dependency>
    64. 64         <dependency>
    65. 65             <groupId>org.apache.hadoop</groupId>
    66. 66             <artifactId>hadoop-hdfs</artifactId>
    67. 67             <version>2.4.0.2.1.4.0-632</version>
    68. 68         </dependency>
    69. 69         <dependency>
    70. 70             <groupId>org.apache.hadoop</groupId>
    71. 71             <artifactId>hadoop-common</artifactId>
    72. 72             <version>2.4.0.2.1.4.0-632</version>
    73. 73             <exclusions>
    74. 74                 <exclusion>
    75. 75                     <groupId>jdk.tools</groupId>
    76. 76                     <artifactId>jdk.tools</artifactId>
    77. 77                 </exclusion>
    78. 78             </exclusions>
    79. 79         </dependency>
    80. 80         <dependency>
    81. 81             <groupId>org.apache.hadoop</groupId>
    82. 82             <artifactId>hadoop-mapreduce-client-common</artifactId>
    83. 83             <version>2.4.0.2.1.4.0-632</version>
    84. 84         </dependency>
    85. 85         <dependency>
    86. 86             <groupId>org.elasticsearch</groupId>
    87. 87             <artifactId>elasticsearch-hadoop</artifactId>
    88. 88             <version>2.0.1</version>
    89. 89         </dependency>
    90. 90     </dependencies>
    91. 91
    92. 92     <build>
    93. 93         <plugins>
    94. 94             <plugin>
    95. 95                 <groupId>org.apache.maven.plugins</groupId>
    96. 96                 <artifactId>maven-compiler-plugin</artifactId>
    97. 97                 <version>3.1</version>
    98. 98                 <configuration>
    99. 99                     <source>${jdk.version}</source>
    100. 100                     <target>${jdk.version}</target>
    101. 101                 </configuration>
    102. 102             </plugin>
    103. 103             <plugin>
    104. 104                 <groupId>org.apache.maven.plugins</groupId>
    105. 105                 <artifactId>maven-assembly-plugin</artifactId>
    106. 106                 <version>2.4</version>
    107. 107                 <configuration>
    108. 108                     <descriptorRefs>
    109. 109                         <descriptorRef>jar-with-dependencies</descriptorRef>
    110. 110                     </descriptorRefs>
    111. 111                 </configuration>
    112. 112             </plugin>
    113. 113         </plugins>
    114. 114     </build>
    115. 115 </project>
    复制代码



    2、样例代码

    1. 1 package com.hansight.spark.utils;
    2. 2
    3. 3 import org.apache.spark.SparkConf;
    4. 4 import org.apache.spark.api.java.JavaSparkContext;
    5. 5
    6. 6 public class SparkUtils {
    7. 7
    8. 8     public static JavaSparkContext get(String name) {
    9. 9         SparkConf conf = new SparkConf();
    10. 10         // conf.setMaster("local[1]");
    11. 11         // conf.setMaster("spark://hdp125:7077");
    12. 12         conf.setAppName(name);
    13. 13         return new JavaSparkContext(conf);
    14. 14     }
    15. 15 }
    复制代码



    1. 1 package com.hansight.spark.streaming;
    2. 2
    3. 3 import java.util.Iterator;
    4. 4
    5. 5 import org.apache.spark.api.java.JavaRDD;
    6. 6 import org.apache.spark.api.java.JavaSparkContext;
    7. 7 import org.apache.spark.api.java.function.Function;
    8. 8 import org.apache.spark.api.java.function.VoidFunction;
    9. 9
    10. 10 import com.hansight.spark.utils.SparkUtils;
    11. 11
    12. 12 public class HttpParser {
    13. 13     @SuppressWarnings({ "unchecked", "serial" })
    14. 14     public static void main(String[] args) {
    15. 15         if (args.length == 0) {
    16. 16             System.out.println("Usage: <file path>");
    17. 17             return;
    18. 18         }
    19. 19         System.setProperty("hadoop.home.dir", "E:/tools/hadoop-2.4.1");
    20. 20         JavaSparkContext sc = SparkUtils.get("HttpLog");
    21. 21         String path = args[0];
    22. 22         JavaRDD<String> rdd = sc
    23. 23                 .textFile(path);
    24. 24         JavaRDD<HttpLog> parsed = rdd.map(new Function<String, HttpLog>() {
    25. 25             public HttpLog call(String line) throws Exception {
    26. 26                 return HttpLog.parser(line);
    27. 27             }
    28. 28         });
    29. 29         System.out.println(parsed.count());
    30. 30         parsed.foreachPartition(new VoidFunction<Iterator<HttpLog>>() {
    31. 31             @Override
    32. 32             public void call(Iterator<HttpLog> t) throws Exception {
    33. 33                 HttpLog.save(t);
    34. 34             }
    35. 35         });
    36. 36     }
    37. 37 }
    复制代码


    1.   1 package com.hansight.spark.streaming;
    2.   2
    3.   3 import java.lang.reflect.Field;
    4.   4 import java.util.HashMap;
    5.   5 import java.util.Iterator;
    6.   6 import java.util.Map;
    7.   7
    8.   8 import org.elasticsearch.action.bulk.BulkRequestBuilder;
    9.   9 import org.elasticsearch.action.bulk.BulkResponse;
    10. 10 import org.elasticsearch.client.Client;
    11. 11
    12. 12 import com.hansight.spark.utils.EsUtils;
    13. 13
    14. 14 public class HttpLog {
    15. 15     private String rawlog;
    16. 16     // VARCHAR2(8 BYTE) 记录类型,表示此记录为HTTP浏览业务记录(取值为’HTTP’)
    17. 17     private String RECORD_TYPE;
    18. 18     // TIMESTAMP 开始时间,格式为:YYYY-MM-DD HH24:MI:SS.xxxxxxxxx
    19. 19     private String CAPTURETIME;
    20. 20     // VARCHAR2(16 BYTE) 手机号码(从创建PDP上下文消息中获取)
    21. 21     private String MSISDN;
    22. 22     // IMSI VARCHAR2(18 BYTE) 国际移动用户识别码(从创建PDP上下文消息中获取)
    23. 23     private String IMSI;
    24. 24     // IMEI(SV) VARCHAR2(20 BYTE) IMEI(SV)号(从创建PDP上下文消息中获取)
    25. 25     private String IMEI;
    26. 26     // VARCHAR2(32 BYTE) APN
    27. 27     private String APN;
    28. 28     // UEIP VARCHAR2(20 BYTE) 终端的IP
    29. 29     private String UEIP;
    30. 30     // SPIP VARCHAR2(20 BYTE) SP的IP
    31. 31     private String SPIP;
    32. 32     // UEPORT NUMBER(12) 终端端口
    33. 33     private int UEPORT;
    34. 34     // SPPORT NUMBER(12) SP端端口
    35. 35     private int SPPORT;
    36. 36     // USERAGENT VARCHAR2(64 BYTE) User Agent信息
    37. 37     private String USERAGENT;
    38. 38     // URL VARCHAR2(256 BYTE) URL,该字段的错误率应不超过万分之一
    39. 39     private String URL;
    40. 40     // HOST VARCHAR2(64 BYTE) HOST信息
    41. 41     private String HOST;
    42. 42     // CONTENTLEN NUMBER(12) 内容大小
    43. 43     private String CONTENTLEN;
    44. 44     // CONTENTTYPE VARCHAR2(64 BYTE) 内容类型
    45. 45     private String CONTENTTYPE;
    46. 46     // BSKIP NUMBER(12) 是否是链接访问,0=否,1=是
    47. 47     private boolean BSKIP;
    48. 48     // REFERER VARCHAR2(256 BYTE) 链接源信息
    49. 49     private String REFERER;
    50. 50     // HTTPSTATUS NUMBER(12) 状态码,请参照附录HTTPSTATUS表
    51. 51     private long HTTPSTATUS;
    52. 52     // RESPDELAY NUMBER(12) 响应时延,单位毫秒
    53. 53     private long RESPDELAY;
    54. 54     // HTTPACTION NUMBER(12) HTTP操作类型(5: Post, 6:Get)
    55. 55     private String HTTPACTION;
    56. 56     // DURATION NUMBER(12) 持续时长
    57. 57     private long DURATION;
    58. 58     // FLOW NUMBER(12) 总流量
    59. 59     private long FLOW;
    60. 60     // UPFLOW NUMBER(12) 上行流量
    61. 61     private long UPFLOW;
    62. 62     // DOWNFLOW NUMBER(12) 下行流量
    63. 63     private long DOWNFLOW;
    64. 64     // SGSNIP VARCHAR2(20 BYTE) SGSN 用户面 IP
    65. 65     private String SGSNIP;
    66. 66     // GGSNIP VARCHAR2(20 BYTE) GGSN 用户面 IP
    67. 67     private String GGSNIP;
    68. 68     // LAC NUMBER(12) LAC信息
    69. 69     private long LAC;
    70. 70     // CI NUMBER(12) CI/SAC信息
    71. 71     private String CI;
    72. 72     // RATTYPE NUMBER(12) RAT Type,1=2G,2=3G
    73. 73     private String RATTYPE;
    74. 74     // STOPTIME TIMESTAMP 结束时间,格式为:YYYY-MM-DD HH24:MI:SS.xxxxxxxxx
    75. 75     private String STOPTIME;
    76. 76     // PBIP VARCHAR2(20 BYTE) 采集解析设备IP地址
    77. 77     private String PBIP;
    78. 78     // PBID NUMBER(12) 采集解析设备编号
    79. 79     private long PBID;
    80. 80
    81. 81     public String getRawlog() {
    82. 82         return rawlog;
    83. 83     }
    84. 84
    85. 85     public void setRawlog(String rawlog) {
    86. 86         this.rawlog = rawlog;
    87. 87     }
    88. 88
    89. 89     public String getRECORD_TYPE() {
    90. 90         return RECORD_TYPE;
    91. 91     }
    92. 92
    93. 93     public void setRECORD_TYPE(String rECORD_TYPE) {
    94. 94         RECORD_TYPE = rECORD_TYPE;
    95. 95     }
    96. 96
    97. 97     public String getCAPTURETIME() {
    98. 98         return CAPTURETIME;
    99. 99     }
    100. 100
    101. 101     public void setCAPTURETIME(String cAPTURETIME) {
    102. 102         CAPTURETIME = cAPTURETIME;
    103. 103     }
    104. 104
    105. 105     public String getMSISDN() {
    106. 106         return MSISDN;
    107. 107     }
    108. 108
    109. 109     public void setMSISDN(String mSISDN) {
    110. 110         MSISDN = mSISDN;
    111. 111     }
    112. 112
    113. 113     public String getIMSI() {
    114. 114         return IMSI;
    115. 115     }
    116. 116
    117. 117     public void setIMSI(String iMSI) {
    118. 118         IMSI = iMSI;
    119. 119     }
    120. 120
    121. 121     public String getIMEI() {
    122. 122         return IMEI;
    123. 123     }
    124. 124
    125. 125     public void setIMEI(String iMEI) {
    126. 126         IMEI = iMEI;
    127. 127     }
    128. 128
    129. 129     public String getAPN() {
    130. 130         return APN;
    131. 131     }
    132. 132
    133. 133     public void setAPN(String aPN) {
    134. 134         APN = aPN;
    135. 135     }
    136. 136
    137. 137     public String getUEIP() {
    138. 138         return UEIP;
    139. 139     }
    140. 140
    141. 141     public void setUEIP(String uEIP) {
    142. 142         UEIP = uEIP;
    143. 143     }
    144. 144
    145. 145     public String getSPIP() {
    146. 146         return SPIP;
    147. 147     }
    148. 148
    149. 149     public void setSPIP(String sPIP) {
    150. 150         SPIP = sPIP;
    151. 151     }
    152. 152
    153. 153     public int getUEPORT() {
    154. 154         return UEPORT;
    155. 155     }
    156. 156
    157. 157     public void setUEPORT(int uEPORT) {
    158. 158         UEPORT = uEPORT;
    159. 159     }
    160. 160
    161. 161     public int getSPPORT() {
    162. 162         return SPPORT;
    163. 163     }
    164. 164
    165. 165     public void setSPPORT(int sPPORT) {
    166. 166         SPPORT = sPPORT;
    167. 167     }
    168. 168
    169. 169     public String getUSERAGENT() {
    170. 170         return USERAGENT;
    171. 171     }
    172. 172
    173. 173     public void setUSERAGENT(String uSERAGENT) {
    174. 174         USERAGENT = uSERAGENT;
    175. 175     }
    176. 176
    177. 177     public String getURL() {
    178. 178         return URL;
    179. 179     }
    180. 180
    181. 181     public void setURL(String uRL) {
    182. 182         URL = uRL;
    183. 183     }
    184. 184
    185. 185     public String getHOST() {
    186. 186         return HOST;
    187. 187     }
    188. 188
    189. 189     public void setHOST(String hOST) {
    190. 190         HOST = hOST;
    191. 191     }
    192. 192
    193. 193     public String getCONTENTLEN() {
    194. 194         return CONTENTLEN;
    195. 195     }
    196. 196
    197. 197     public void setCONTENTLEN(String cONTENTLEN) {
    198. 198         CONTENTLEN = cONTENTLEN;
    199. 199     }
    200. 200
    201. 201     public String getCONTENTTYPE() {
    202. 202         return CONTENTTYPE;
    203. 203     }
    204. 204
    205. 205     public void setCONTENTTYPE(String cONTENTTYPE) {
    206. 206         CONTENTTYPE = cONTENTTYPE;
    207. 207     }
    208. 208
    209. 209     public boolean isBSKIP() {
    210. 210         return BSKIP;
    211. 211     }
    212. 212
    213. 213     public void setBSKIP(boolean bSKIP) {
    214. 214         BSKIP = bSKIP;
    215. 215     }
    216. 216
    217. 217     public String getREFERER() {
    218. 218         return REFERER;
    219. 219     }
    220. 220
    221. 221     public void setREFERER(String rEFERER) {
    222. 222         REFERER = rEFERER;
    223. 223     }
    224. 224
    225. 225     public long getHTTPSTATUS() {
    226. 226         return HTTPSTATUS;
    227. 227     }
    228. 228
    229. 229     public void setHTTPSTATUS(long hTTPSTATUS) {
    230. 230         HTTPSTATUS = hTTPSTATUS;
    231. 231     }
    232. 232
    233. 233     public long getRESPDELAY() {
    234. 234         return RESPDELAY;
    235. 235     }
    236. 236
    237. 237     public void setRESPDELAY(long rESPDELAY) {
    238. 238         RESPDELAY = rESPDELAY;
    239. 239     }
    240. 240
    241. 241     public String getHTTPACTION() {
    242. 242         return HTTPACTION;
    243. 243     }
    244. 244
    245. 245     public void setHTTPACTION(String hTTPACTION) {
    246. 246         HTTPACTION = hTTPACTION;
    247. 247     }
    248. 248
    249. 249     public long getDURATION() {
    250. 250         return DURATION;
    251. 251     }
    252. 252
    253. 253     public void setDURATION(long dURATION) {
    254. 254         DURATION = dURATION;
    255. 255     }
    256. 256
    257. 257     public long getFLOW() {
    258. 258         return FLOW;
    259. 259     }
    260. 260
    261. 261     public void setFLOW(long fLOW) {
    262. 262         FLOW = fLOW;
    263. 263     }
    264. 264
    265. 265     public long getUPFLOW() {
    266. 266         return UPFLOW;
    267. 267     }
    268. 268
    269. 269     public void setUPFLOW(long uPFLOW) {
    270. 270         UPFLOW = uPFLOW;
    271. 271     }
    272. 272
    273. 273     public long getDOWNFLOW() {
    274. 274         return DOWNFLOW;
    275. 275     }
    276. 276
    277. 277     public void setDOWNFLOW(long dOWNFLOW) {
    278. 278         DOWNFLOW = dOWNFLOW;
    279. 279     }
    280. 280
    281. 281     public String getSGSNIP() {
    282. 282         return SGSNIP;
    283. 283     }
    284. 284
    285. 285     public void setSGSNIP(String sGSNIP) {
    286. 286         SGSNIP = sGSNIP;
    287. 287     }
    288. 288
    289. 289     public String getGGSNIP() {
    290. 290         return GGSNIP;
    291. 291     }
    292. 292
    293. 293     public void setGGSNIP(String gGSNIP) {
    294. 294         GGSNIP = gGSNIP;
    295. 295     }
    296. 296
    297. 297     public long getLAC() {
    298. 298         return LAC;
    299. 299     }
    300. 300
    301. 301     public void setLAC(long lAC) {
    302. 302         LAC = lAC;
    303. 303     }
    304. 304
    305. 305     public String getCI() {
    306. 306         return CI;
    307. 307     }
    308. 308
    309. 309     public void setCI(String cI) {
    310. 310         CI = cI;
    311. 311     }
    312. 312
    313. 313     public String getRATTYPE() {
    314. 314         return RATTYPE;
    315. 315     }
    316. 316
    317. 317     public void setRATTYPE(String rATTYPE) {
    318. 318         RATTYPE = rATTYPE;
    319. 319     }
    320. 320
    321. 321     public String getSTOPTIME() {
    322. 322         return STOPTIME;
    323. 323     }
    324. 324
    325. 325     public void setSTOPTIME(String sTOPTIME) {
    326. 326         STOPTIME = sTOPTIME;
    327. 327     }
    328. 328
    329. 329     public String getPBIP() {
    330. 330         return PBIP;
    331. 331     }
    332. 332
    333. 333     public void setPBIP(String pBIP) {
    334. 334         PBIP = pBIP;
    335. 335     }
    336. 336
    337. 337     public long getPBID() {
    338. 338         return PBID;
    339. 339     }
    340. 340
    341. 341     public void setPBID(long pBID) {
    342. 342         PBID = pBID;
    343. 343     }
    344. 344
    345. 345     public static HttpLog parser(String line) {
    346. 346         if (line == null) {
    347. 347             return null;
    348. 348         }
    349. 349         String[] arr = line.split("","");
    350. 350         HttpLog log = new HttpLog();
    351. 351         log.setRawlog(line);
    352. 352         if (arr.length != 31) {
    353. 353             return log;
    354. 354         }
    355. 355         String start = arr[0];
    356. 356         if (start != null) {
    357. 357             start = arr[0].substring(1);
    358. 358         }
    359. 359         log.setRECORD_TYPE(start);
    360. 360         log.setCAPTURETIME(arr[1]);
    361. 361         log.setMSISDN(arr[2]);
    362. 362         log.setIMSI(arr[3]);
    363. 363         log.setIMEI(arr[4]);
    364. 364         log.setAPN(arr[5]);
    365. 365         log.setUEIP(arr[6]);
    366. 366         log.setSPIP(arr[7]);
    367. 367         log.setUEPORT(Integer.parseInt(arr[8]));
    368. 368         log.setSPPORT(Integer.parseInt(arr[9]));
    369. 369         log.setUSERAGENT(arr[10]);
    370. 370         log.setURL(arr[11]);
    371. 371         log.setHOST(arr[12]);
    372. 372         log.setCONTENTLEN(arr[13]);
    373. 373         log.setCONTENTTYPE(arr[14]);
    374. 374         log.setBSKIP("1".equals(arr[15]));
    375. 375         log.setREFERER(arr[16]);
    376. 376         log.setHTTPSTATUS(Long.parseLong(arr[17]));
    377. 377         log.setRESPDELAY(Long.parseLong(arr[18]));
    378. 378         String action = arr[19];
    379. 379         if ("5".equals(action)) {
    380. 380             action = "POST";
    381. 381         } else if ("6".equals(action)) {
    382. 382             action = "GET";
    383. 383         }
    384. 384         log.setHTTPACTION(action);
    385. 385
    386. 386         log.setDURATION(Long.parseLong(arr[20]));
    387. 387         log.setFLOW(Long.parseLong(arr[21]));
    388. 388         log.setUPFLOW(Long.parseLong(arr[22]));
    389. 389         log.setDOWNFLOW(Long.parseLong(arr[23]));
    390. 390         log.setSGSNIP(arr[24]);
    391. 391         log.setGGSNIP(arr[25]);
    392. 392         log.setLAC(Long.parseLong(arr[26]));
    393. 393         log.setCI(arr[27]);
    394. 394         String ratType = arr[28];
    395. 395         if ("1".equals(ratType)) {
    396. 396             ratType = "2G";
    397. 397         } else if ("2".equals(ratType)) {
    398. 398             ratType = "3G";
    399. 399         }
    400. 400         log.setRATTYPE(ratType);
    401. 401         log.setSTOPTIME(arr[29]);
    402. 402         log.setPBIP(arr[30]);
    403. 403         String stop = arr[31];
    404. 404         if (stop != null) {
    405. 405             stop = stop.substring(0, stop.length() - 1);
    406. 406         }
    407. 407         log.setPBID(Long.parseLong(stop));
    408. 408         return log;
    409. 409     }
    410. 410
    411. 411     public static void save(Iterator<HttpLog> t) {
    412. 412         try {
    413. 413             Client client = EsUtils.getEsClient();
    414. 414             BulkRequestBuilder bulk = client.prepareBulk();
    415. 415             int index = 0;
    416. 416             while (t.hasNext()) {
    417. 417                 HttpLog log = t.next();
    418. 418                 index++;
    419. 419                 bulk.add(client.prepareIndex("logs_nuoxi", "http").setSource(
    420. 420                         log.toJSON()));
    421. 421                 if (index >= 500) {
    422. 422                     BulkResponse bulkResponse = bulk.execute().actionGet();
    423. 423                     if (bulkResponse.hasFailures()) {
    424. 424                         // 处理错误
    425. 425                         System.out.println(bulkResponse.buildFailureMessage());
    426. 426                     }
    427. 427                     index = 0;
    428. 428                 }
    429. 429             }
    430. 430             if (index != 0) {
    431. 431                 BulkResponse bulkResponse = bulk.execute().actionGet();
    432. 432                 if (bulkResponse.hasFailures()) {
    433. 433                     // 处理错误
    434. 434                     System.out.println(bulkResponse.buildFailureMessage());
    435. 435                 }
    436. 436             }
    437. 437         } catch (Exception e) {
    438. 438             e.printStackTrace();
    439. 439             throw e;
    440. 440         }
    441. 441         // client.close();
    442. 442     }
    443. 443
    444. 444     private Map<String, Object> toJSON() {
    445. 445         Field[] fields = this.getClass().getDeclaredFields();
    446. 446         Map<String, Object> map = new HashMap<>();
    447. 447         for (Field field : fields) {
    448. 448             field.setAccessible(true);
    449. 449             try {
    450. 450                 map.put(field.getName().toLowerCase(), field.get(this));
    451. 451             } catch (IllegalArgumentException | IllegalAccessException e) {
    452. 452                 e.printStackTrace();
    453. 453             }
    454. 454         }
    455. 455         return map;
    456. 456     }
    457. 457 }
    复制代码

    R
    回复

    使用道具 举报

    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    QQ|手机版|Java学习者论坛 ( 声明:本站资料整理自互联网,用于Java学习者交流学习使用,对资料版权不负任何法律责任,若有侵权请及时联系客服屏蔽删除 )

    GMT+8, 2024-5-1 01:44 , Processed in 0.352284 second(s), 37 queries .

    Powered by Discuz! X3.4

    © 2001-2017 Comsenz Inc.

    快速回复 返回顶部 返回列表