Splunk入门学习
概况
Splunk是一个可以让你去搜索、分析、可视化机器数据的软件,当定义好数据源之后,splunk会对数据源的数据进行索引,并将他们组织到一系列个性化的事件中供用户查看及搜索。
大多数用户使用Web浏览器连接到Splunk Enterprise,并使用Splunk Web来管理其部署,管理和创建知识对象,运行搜索,创建数据透视表和报表等等。您还可以使用命令行界面来管理Splunk Enterprise部署。
Splunk组件
- Forwarder : 转发器先使用数据,然后再将数据转发到索引器。转发器通常需要最少的资源,从而使它们可以轻松地驻留在生成数据的计算机上。
- Indexer: 索引器对通常从一组转发器接收的传入数据进行索引。索引器将数据转换为事件并将事件存储在索引中。索引器还响应于来自Search Head的搜索请求来搜索索引数据。为了确保高数据可用性并防止数据丢失,或者只是为了简化多个索引器的管理,您可以在索引器群集中部署多个索引器。
- Search Head:与用户交互,将搜索请求定向到一组索引器,然后将结果合并回用户。为了确保高可用性并简化水平扩展,您可以在搜索头集群中部署多个Search Head。
简单的分布式例子

遇到问题可以先到Splunk社区问答中搜索一下,看一下是否有人已经提出这个问题并得到解决。
搜索教程
上传数据
上传数据时,将数据转换的过程称为索引,处理传入的数据以实现快速搜索和分析,处理后的记过作为事件存储在索引中。
默认情况下,所有数据都放入一个名为main的预配置索引中。当您将数据添加到Splunk实例时,可以创建索引来存储数据
搜索教程
字段:作为索引过程的一部分,信息将从数据中提取出来,并格式化为名称和值对,称为字段
字段搜索格式为
字段名=字段值,注意事项:字段名区分大小写,字段值不区分大小写字段值可以使用通配符字段值中含有空格,必须使用引号
关键字:在搜索中需要用到多个关键字查询,用
OR、AND、NOT连接。默认空格=AND,注意的是,连接词是大写的。一个多关键字的例子,星号(*)字符用作通配符匹配fail,failure,failed,failing,等等。buttercupgames (error OR fail* OR severe)在评估布尔表达式时,括号内的术语优先。NOT子句先于OR子句进行求值。AND子句的优先级最低。
搜索结果:

事件下方有一个时间柱状图,标识了搜索条件在该时间段出现的次数,出现次数越多,柱状图就越高。还可以通过
-缩小按钮来获取更多时间段的统计。模式:将结构相似的时间进行统计。
搜索处理语言(SPL):Splunk开发了与Splunk软件一起使用的搜索处理语言(SPL)。SPL包含所有搜索命令及其功能,参数和子句。

在首选项中开启
搜索助理的完全模式,来学习。sourcetype=access_* status=200 action=purchase | top categoryId|: 管道搜索,以左边的结果为搜索输入。top: 通用命令,返回字段中最常见的字段,该top命令是**转换命令**。转换命令将搜索结果组织到一个表中。使用转换命令生成可用于创建可视化的结果,例如柱形图,折线图,面积图和饼图。
子搜索: 配合管道和转换命令来执行子查询。还可以用
AS标识符来对搜索结果列名进行重命名。
创建报告和图表
1.保存并共享报告
执行搜索
sourcetype=access_* status=200 action=purchase [search sourcetype=access_* status=200 action=purchase | top limit=1 clientip | table clientip] | stats count AS "Total Purchased", dc(productId) AS "Total Products", values(productName) AS "Product Names" BY clientip | rename clientip AS "VIP Customer"
将报告另存为报表,在报表栏中可以查看或编辑该报告。
将报告设置成共享,只需要设置报表的权限为 应用 即可。
2.创建基本图表
开始搜索
sourcetype=access_* status=200 | chart count AS views count(eval(action="addtocart")) AS addtocart count(eval(action="purchase")) AS purchases by productName | rename productName AS "Product Name", views AS "Views", addtocart AS "Adds to Cart", purchases AS "Purchases"
该搜索使用chart命令来算那些事件的数量action=purchase和action=addtocart。然后,搜索使用rename命令来重命名结果中显示的字段。
该chart命令是变换命令。搜索结果显示在“统计信息”选项卡上。

创建仪表板
仪表板是由面板组成的视图。面板可以包含诸如搜索框,字段,图表,表格和列表之类的模块。仪表板面板通常连接到报表。
- 创建一个搜索
sourcetype=access_* status=200 action=purchase | top categoryId
点击可视化,设置为饼图,并保存为仪表版面板

创建一个新的仪表板面板


编辑仪表板

可在
添加面板中,为仪表板添加更多视图。
搜索语言(SPL)
Splunk SDK For Java
概况
使用Splunk Enterprise SDK for Java,您可以编写Java应用程序以编程方式与Splunk引擎进行交互。该SDK建立在REST API的基础上,提供了REST API端点的包装。
详细说明请看:Splunk SDK For Java 官方文档
命名空间
为了在整个Splunk安装过程中考虑到用户查看应用程序,系统文件和其他实体资源的权限,Splunk提供了基于名称空间对实体资源的访问。这类似于使用端点访问资源时Splunk REST API使用的应用程序/用户上下文。命名空间用法为ower,app,共享模式
- ower:Splunk用户名,如
admin。值为nobody表示没有特定用户。 - app:应用程序,如
search。 值为-表示所有应用程序。 - 共享模式:
- “user”: 资源是特定用户的资源
- “app”: 资源是由app共享的,由app指定
- “global”: 资源是所有app共享的
- “system”: 资源是系统资源
列出所有保存的针对Kramer用户的搜索: kramer,-,user
快速入门
1.安装依赖
在项目的 pom.xml文件中添加SDK依赖。
<repositories>
...
<repository>
<id>splunk-artifactory</id>
<name>Splunk Releases</name>
<url>https://splunk.jfrog.io/splunk/ext-releases-local</url>
</repository>
</repositories>
<dependencies>
...
<dependency>
<groupId>com.splunk</groupId>
<artifactId>splunk</artifactId>
<version>1.6.0.0</version>
</dependency>
</dependencies>
2.连接到Splunk Enterprise
//解决SSL3报错,需要加入下面语句
HttpService.setSslSecurityProtocol(SSLSecurityProtocol.TLSv1_2);
ServiceArgs loginArgs = new ServiceArgs();
loginArgs.setUsername("admin");
loginArgs.setPassword("tangseng233");
loginArgs.setHost("192.168.215.131");
loginArgs.setPort(8089);
Service service = Service.connect(loginArgs);
3.处理搜索和作业
搜索以不同的方式运行,确定了它何时运行及如何检索结果:
- 普通:普通搜索异步运行。它立即返回搜索作业。轮询作业以确定其状态。搜索完成后,您可以检索结果。如果启用了“预览”,您还可以预览结果。普通模式适用于实时搜索。
- 阻止:阻止搜索同步运行。在搜索完成之前,它不会返回搜索作业,因此无需轮询状态。阻止模式不适用于实时搜索。
- Oneshot:oneshot搜索是计划立即运行的阻止搜索。此模式不返回搜索作业,而是在完成后返回搜索结果。因为这是一个阻塞搜索,所以结果直到搜索完成才可用。
- 实时:实时搜索以正常模式运行,并在实时事件流入Splunk Enterprise进行索引时对其进行搜索。返回的事件在指定的时间范围内符合您的条件。
- 导出:导出搜索将立即运行,不返回搜索作业,并立即开始流式传输结果。此搜索对于从Splunk Enterprise导出大量数据很有用。
对于那些产生搜索作业(正常,阻止和实时)的搜索,搜索结果将在服务器上保存一段时间,并可根据要求进行检索。对于流式传输结果(oneshot和导出)的那些搜索,搜索结果不会保留在服务器上。如果流由于某种原因而中断,则无法再次运行搜索结果就无法恢复。
搜索基本代码
普通搜索
import com.splunk.*;
...
//开始一个普通搜索 (搜索所有,展示头100条)
String query = "search * | head 100"
JobArgs jobargs = new JobArgs();
//设置作业模式为普通搜索
jobargs.setExecutionMode(JobArgs.ExecutionMode.NORMAL);
Job job = service.getJobs().create(query, jobargs);
// xml结果接收(后面详细讲)
InputStream resultsNormalSearch = job.getResults();
ResultsReaderXml resultsReaderNormalSearch;
try {
resultsReaderNormalSearch = new ResultsReaderXml(resultsNormalSearch);
HashMap<String, String> event;
//普通搜索是异步的,搜索结果需要用循环来确定是否还有下一个事件返回。
while ((event = resultsReaderNormalSearch.getNextEvent()) != null) {
System.out.println("\n*********EVENT****************\n");
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
} catch (Exception e) {
e.printStackTrace();
}
System.out.println("\nSearch job properties\n---------------------");
System.out.println("Search job ID: " + job.getSid());
System.out.println("The number of events: " + job.getEventCount());
System.out.println("The number of results: " + job.getResultCount());
System.out.println("Search duration: " + job.getRunDuration() + " seconds");
System.out.println("This job expires in: " + job.getTtl() + " seconds");
阻止搜索
String searchQuery_blocking = "search * | head 100"; //
JobArgs jobargs = new JobArgs();
//设置为阻止搜索
jobargs.setExecutionMode(JobArgs.ExecutionMode.BLOCKING);
//阻止搜索为同步搜索,当搜索完成后job也完全返回了,无需循环
System.out.println("Wait for the search to finish...");
Job job = service.getJobs().create(searchQuery_blocking, jobargs);
System.out.println("...done!\n");
System.out.println("Search job properties:\n---------------------");
System.out.println("Search job ID: " + job.getSid());
System.out.println("The number of events: " + job.getEventCount());
System.out.println("The number of results: " + job.getResultCount());
System.out.println("Search duration: " + job.getRunDuration() + " seconds");
System.out.println("This job expires in: " + job.getTtl() + " seconds");
OneShot
//设置搜索时间等参数
Args oneshotSearchArgs = new Args();
oneshotSearchArgs.put("earliest_time", "2012-06-19T12:00:00.000-07:00");
oneshotSearchArgs.put("latest_time", "2012-06-20T12:00:00.000-07:00");
String oneshotSearchQuery = "search * | head 10";
//与其他搜索不同,oneshot搜索不会创建搜索作业,因此无法使用Job和JobCollection类访问它。而是使用Service.oneshotSearch方法
InputStream results_oneshot = service.oneshotSearch(oneshotSearchQuery, oneshotSearchArgs);
// 直接处理结果
try {
ResultsReaderXml resultsReader = new ResultsReaderXml(results_oneshot);
System.out.println("Searching everything in a 24-hour time range starting June 19, 12:00pm and displaying 10 results in XML:\n");
HashMap<String, String> event;
while ((event = resultsReader.getNextEvent()) != null) {
System.out.println("\n********EVENT********");
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
resultsReader.close();
} catch (Exception e) {
e.printStackTrace();
}
实时搜索
//在设置的时间范围内,搜索作业会不断的返回新的结果,直到搜索结束,
JobArgs jobArgs = new JobArgs();
//执行模式设置为普通
jobArgs.setExecutionMode(JobArgs.ExecutionMode.NORMAL);
//查询模式设置为实时
jobArgs.setSearchMode(JobArgs.SearchMode.REALTIME);
//当前时间 rt -1m 减1分钟
jobArgs.setEarliestTime("rt-1m");
//当前时间
jobArgs.setLatestTime("rt");
jobArgs.setStatusBuckets(300);
//创建搜索作业
String mySearch = "search index=_internal";
Job job = service.search(mySearch, jobArgs);
// 等待作业准备完成
while (!job.isReady()) {
Thread.sleep(500);
}
//接收预览结果
JobResultsPreviewArgs previewArgs = new JobResultsPreviewArgs();
previewArgs.setCount(300); // Retrieve 300 previews at a time
//实时输出
while (true) {
InputStream stream = job.getResultsPreview(previewArgs);
String line = null;
BufferedReader reader = new BufferedReader(new InputStreamReader(
stream, "UTF-8"));
while ((line = reader.readLine()) != null) {
System.out.println(line);
}
reader.close();
stream.close();
Thread.sleep(500);
}
导出搜索
//创建搜索参数
JobExportArgs exportArgs = new JobExportArgs();
exportArgs.setEarliestTime("-1h");
exportArgs.setLatestTime("now");
exportArgs.setSearchMode(JobExportArgs.SearchMode.NORMAL);
//执行导出查询
String mySearch = "search index=_internal";
InputStream exportSearch = service.export(mySearch, exportArgs);
//MultiResultsReaderXml 接收导出结果
MultiResultsReaderXml multiResultsReader = new MultiResultsReaderXml(exportSearch);
int counter = 0;
for (SearchResults searchResults : multiResultsReader)
{
for (Event event : searchResults) {
System.out.println("***** Event " + counter++ + " *****");
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
}
multiResultsReader.close();
4.显示处理结果
运行搜索后,可以从搜索作业中检索不同的输出:
- 事件:搜索的未转换事件。
- 结果:处理完成后搜索的转换结果。如果搜索没有转换命令,则结果与事件相同。如果存在转换命令,则结果计数将小于事件计数。
- 结果预览:仍在进行中的搜索预览,或实时搜索的结果。搜索完成后,预览结果与结果相同。您必须为非实时搜索启用预览(为实时搜索自动启用预览)。
- 摘要:有关到目前为止已读取结果的搜索字段的摘要信息。将
status_buckets搜索作业上的“ ”设置为正值以访问此数据。 - 时间轴:到目前为止已读取的未转换事件随时间的事件分布。将
status_buckets搜索作业上的“ ”设置为正值以访问此数据。
此输出作为流以XML,JSON,JSON_COLS,JSON_ROWS,CSV,ATOM或RAW格式返回。您可以使用标准Java类显示直接结果,也可以创建自己的解析器。但是为方便起见,SDK包含XML,CSV和JSON的结果读取器,它们可以为您正确地解析和格式化结果,并为每个Splunk Enterprise版本处理每种输出类型的特性。
搜索结果API
Job.getEvents方法从搜索作业中检索事件。使用JobEventsArgs该类可以为方法指定其他参数。Job.getResults方法从搜索作业中检索结果。使用JobResultsArgs该类可以为方法指定其他参数。Job.getResultsPreview方法从搜索作业中检索结果预览。使用JobResultsPreviewArgs该类可以为方法指定其他参数。Job.getSummary方法从搜索作业中检索摘要数据。使用JobSummaryArgs该类可以为方法指定其他参数。Job.getTimeline方法从搜索作业中检索时间轴数据。
使用以下类与结果阅读器一起显示结果。对于导出搜索的结果,请使用多结果阅读器来解析返回的多个结果集。
ResultsReader是基类。ResultsReaderCsv显示CSV结果。ResultsReaderJson显示JSON结果。ResultsReaderXml显示XML结果。
MultiResultsReader是多结果的基类。(导出大量结果时用,返回数据流而不是job结果时用)MultiResultsReaderJson类显示多组JSON的结果。MultiResultsReaderXml级显示多组XML结果。
示例
无阅读器直接接收结果
String mySearch = "search * | head 5";
Job job = service.getJobs().create(mySearch);
while (!job.isDone()) {
Thread.sleep(500);
}
//
InputStream results = job.getResults();
String line = null;
System.out.println("Results from the search job as XML:\n");
//用原生字符缓冲流读取 (默认为xml格式结果流)
BufferedReader br = new BufferedReader(new InputStreamReader(results, "UTF-8"));
while ((line = br.readLine()) != null) {
System.out.println(line);
}
br.close();
ResultsReaderJson 读取
String mySearch = "search * | head 5";
Job job = service.getJobs().create(mySearch);
while (!job.isDone()) {
Thread.sleep(500);
}
JobResultsArgs resultsArgs = new JobResultsArgs();
//设置结果 格式化成Json格式
resultsArgs.setOutputMode(JobResultsArgs.OutputMode.JSON);
//ResultsReaderJson 接收结果
InputStream results = job.getResults(resultsArgs);
ResultsReaderJson resultsReader = new ResultsReaderJson(results);
HashMap<String, String> event;
System.out.println("\nFormatted results from the search job as JSON\n");
while ((event = resultsReader.getNextEvent()) != null) {
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
resultsReader.close();
显示预览结果
只要满足以下两个条件,就可以显示正在进行的搜索结果的预览:
- 搜索必须在正常执行模式下运行(“
exec_mode“是”normal“)。预览不适用于阻止搜索(搜索作业ID在完成搜索后才可用)或流式搜索(无论如何都将返回结果)。 - 必须启用预览(“
preview”为“1”)。默认情况下,仅对实时搜索启用预览,并且将“status_buckets”设置为正值。使用该Job.enablePreview方法可以为现有搜索作业启用预览。
要显示预览,请运行搜索,为搜索作业启用预览,然后从中检索预览结果。默认情况下,将检索最近的100个预览。要更改此数字,请为“ count”设置一个值。使用“ offset”值可以浏览大量预览。
String mySearch = "search * | head 50000";
JobArgs jobArgs = new JobArgs();
jobArgs.setExecutionMode(JobArgs.ExecutionMode.NORMAL);
Job job = service.search(mySearch, jobArgs);
job.enablePreview();
job.update();
while (!job.isReady()) {
Thread.sleep(500);
}
int countPreview=0;
int countBatch=0;
while (!job.isDone()) {
JobResultsPreviewArgs previewargs = new JobResultsPreviewArgs();
previewargs.setCount(500); // Get 500 previews at a time
previewargs.setOutputMode(JobResultsPreviewArgs.OutputMode.XML);
InputStream results = job.getResultsPreview(previewargs);
ResultsReaderXml resultsReader = new ResultsReaderXml(results);
HashMap<String, String> event;
while ((event = resultsReader.getNextEvent()) != null) {
System.out.println("BATCH " + countBatch + "\nPREVIEW " + countPreview++ + " ********");
for (String key: event.keySet())
System.out.println(" " + key + ": " + event.get(key));
}
countBatch++;
resultsReader.close();
}
System.out.println("Job is done with " + job.getResultCount() + " results");