java多线程处理执行solr创建索引示例

复制代码 代码如下:

public class SolrIndexer implements Indexer, Searcher, DisposableBean {
 //~ Static fields/initializers =============================================

static final Logger logger = LoggerFactory.getLogger(SolrIndexer.class);

private static final long SHUTDOWN_TIMEOUT    = 5 * 60 * 1000L; // long enough

private static final int  INPUT_QUEUE_LENGTH  = 16384;

//~ Instance fields ========================================================

private CommonsHttpSolrServer server;

private BlockingQueue<Operation> inputQueue;

private Thread updateThread;
 volatile boolean running = true;
 volatile boolean shuttingDown = false;

//~ Constructors ===========================================================

public SolrIndexer(String url) throws MalformedURLException {
  server = new CommonsHttpSolrServer(url);

inputQueue = new ArrayBlockingQueue<Operation>(INPUT_QUEUE_LENGTH);

updateThread = new Thread(new UpdateTask());
  updateThread.setName("SolrIndexer");
  updateThread.start();
 }

//~ Methods ================================================================

public void setSoTimeout(int timeout) {
  server.setSoTimeout(timeout);
 }

public void setConnectionTimeout(int timeout) {
  server.setConnectionTimeout(timeout);
 }

public void setAllowCompression(boolean allowCompression) {
  server.setAllowCompression(allowCompression);
 }

public void addIndex(Indexable indexable) throws IndexingException {
  if (shuttingDown) {
   throw new IllegalStateException("SolrIndexer is shutting down");
  }
  inputQueue.offer(new Operation(indexable, OperationType.UPDATE));
 }

public void delIndex(Indexable indexable) throws IndexingException {
  if (shuttingDown) {
   throw new IllegalStateException("SolrIndexer is shutting down");
  }
  inputQueue.offer(new Operation(indexable, OperationType.DELETE));
 }

private void updateIndices(String type, List<Indexable> indices) throws IndexingException {
  if (indices == null || indices.size() == 0) {
   return;
  }

logger.debug("Updating {} indices", indices.size());

UpdateRequest req = new UpdateRequest("/" + type + "/update");
  req.setAction(UpdateRequest.ACTION.COMMIT, false, false);

for (Indexable idx : indices) {
   Doc doc = idx.getDoc();

SolrInputDocument solrDoc = new SolrInputDocument();
   solrDoc.setDocumentBoost(doc.getDocumentBoost());
   for (Iterator<Field> i = doc.iterator(); i.hasNext();) {
    Field field = i.next();
    solrDoc.addField(field.getName(), field.getValue(), field.getBoost());
   }

req.add(solrDoc);   
  }

try {
   req.process(server);   
  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  } catch (IOException e) {
   logger.error("IOException occurred", e);
   throw new IndexingException(e);
  }
 }

private void delIndices(String type, List<Indexable> indices) throws IndexingException {
  if (indices == null || indices.size() == 0) {
   return;
  }

logger.debug("Deleting {} indices", indices.size());

UpdateRequest req = new UpdateRequest("/" + type + "/update");
  req.setAction(UpdateRequest.ACTION.COMMIT, false, false);
  for (Indexable indexable : indices) {   
   req.deleteById(indexable.getDocId());
  }

try {
   req.process(server);
  } catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  } catch (IOException e) {
   logger.error("IOException occurred", e);
   throw new IndexingException(e);
  }
 }

public QueryResult search(Query query) throws IndexingException {
  SolrQuery sq = new SolrQuery();
  sq.setQuery(query.getQuery());
  if (query.getFilter() != null) {
   sq.addFilterQuery(query.getFilter());
  }
  if (query.getOrderField() != null) {
   sq.addSortField(query.getOrderField(), query.getOrder() == Query.Order.DESC ? SolrQuery.ORDER.desc : SolrQuery.ORDER.asc);
  }
  sq.setStart(query.getOffset());
  sq.setRows(query.getLimit());

QueryRequest req = new QueryRequest(sq);
  req.setPath("/" + query.getType() + "/select");

try {
   QueryResponse rsp = req.process(server);
   SolrDocumentList docs = rsp.getResults();

QueryResult result = new QueryResult();
   result.setOffset(docs.getStart());
   result.setTotal(docs.getNumFound());
   result.setSize(sq.getRows());

List<Doc> resultDocs = new ArrayList<Doc>(result.getSize());
   for (Iterator<SolrDocument> i = docs.iterator(); i.hasNext();) {
    SolrDocument solrDocument = i.next();

Doc doc = new Doc();
    for (Iterator<Map.Entry<String, Object>> iter = solrDocument.iterator(); iter.hasNext();) {
     Map.Entry<String, Object> field = iter.next();
     doc.addField(field.getKey(), field.getValue());
    }

resultDocs.add(doc);
   }

result.setDocs(resultDocs);
   return result;

} catch (SolrServerException e) {
   logger.error("SolrServerException occurred", e);
   throw new IndexingException(e);
  }
 }

public void destroy() throws Exception {
  shutdown(SHUTDOWN_TIMEOUT, TimeUnit.MILLISECONDS);  
 }

public boolean shutdown(long timeout, TimeUnit unit) {
  if (shuttingDown) {
   logger.info("Suppressing duplicate attempt to shut down");
   return false;
  }
  shuttingDown = true;
  String baseName = updateThread.getName();
  updateThread.setName(baseName + " - SHUTTING DOWN");
  boolean rv = false;
  try {
   // Conditionally wait
   if (timeout > 0) {
    updateThread.setName(baseName + " - SHUTTING DOWN (waiting)");
    rv = waitForQueue(timeout, unit);
   }
  } finally {
   // But always begin the shutdown sequence
   running = false;
   updateThread.setName(baseName + " - SHUTTING DOWN (informed client)");
  }
  return rv;
 }

/**
  * @param timeout
  * @param unit
  * @return
  */
 private boolean waitForQueue(long timeout, TimeUnit unit) {
  CountDownLatch latch = new CountDownLatch(1);
  inputQueue.add(new StopOperation(latch));
  try {
   return latch.await(timeout, unit);
  } catch (InterruptedException e) {
   throw new RuntimeException("Interrupted waiting for queues", e);
  }
 }

class UpdateTask implements Runnable {
  public void run() {
   while (running) {
    try {
     syncIndices();
    } catch (Throwable e) {
     if (shuttingDown) {
      logger.warn("Exception occurred during shutdown", e);
     } else {
      logger.error("Problem handling solr indexing updating", e);
     }
    }
   }
   logger.info("Shut down SolrIndexer");
  }
 }

void syncIndices() throws InterruptedException {
  Operation op = inputQueue.poll(1000L, TimeUnit.MILLISECONDS);

if (op == null) {
   return;
  }

if (op instanceof StopOperation) {
   ((StopOperation) op).stop();
   return;
  }

// wait 1 second
  try {
   Thread.sleep(1000);
  } catch (InterruptedException e) {

}

List<Operation> ops = new ArrayList<Operation>(inputQueue.size() + 1);
  ops.add(op);
  inputQueue.drainTo(ops);

Map<String, List<Indexable>> deleteMap = new HashMap<String, List<Indexable>>(4);
  Map<String, List<Indexable>> updateMap = new HashMap<String, List<Indexable>>(4);

for (Operation o : ops) {
   if (o instanceof StopOperation) {
    ((StopOperation) o).stop();
   } else {
    Indexable indexable = o.indexable;
    if (o.type == OperationType.DELETE) {
     List<Indexable> docs = deleteMap.get(indexable.getType());
     if (docs == null) {
      docs = new LinkedList<Indexable>();
      deleteMap.put(indexable.getType(), docs);
     }
     docs.add(indexable);
    } else {
     List<Indexable> docs = updateMap.get(indexable.getType());
     if (docs == null) {
      docs = new LinkedList<Indexable>();
      updateMap.put(indexable.getType(), docs);
     }
     docs.add(indexable);
    }
   }
  }

for (Iterator<Map.Entry<String, List<Indexable>>> i = deleteMap.entrySet().iterator(); i.hasNext();) {
   Map.Entry<String, List<Indexable>> entry = i.next();
   delIndices(entry.getKey(), entry.getValue());
  }

for (Iterator<Map.Entry<String, List<Indexable>>> i = updateMap.entrySet().iterator(); i.hasNext();) {
   Map.Entry<String, List<Indexable>> entry = i.next();
   updateIndices(entry.getKey(), entry.getValue());
  }
 }

enum OperationType { DELETE, UPDATE, SHUTDOWN }

static class Operation {
  OperationType type;
  Indexable indexable;

Operation() {}

Operation(Indexable indexable, OperationType type) {
   this.indexable = indexable;
   this.type = type;
  }
 }

static class StopOperation extends Operation {
  CountDownLatch latch;

StopOperation(CountDownLatch latch) {
   this.latch = latch;
   this.type = OperationType.SHUTDOWN;
  }

public void stop() {
   latch.countDown();
  }
 }

//~ Accessors ===============

}

时间: 2014-02-24

Solr通过特殊字符分词实现自定义分词器详解

前言 我们在对英文句子分词的时候,一般采用采用的分词器是WhiteSpaceTokenizerFactory,有一次因业务要求,需要根据某一个特殊字符(以逗号分词,以竖线分词)分词.感觉这种需求可能与WhiteSpaceTokenizerFactory相像,于是自己根据Solr源码自定义了分词策略. 业务场景 有一次,我拿到的数据都是以竖线"|"分隔,分词的时候,需要以竖线为分词单元.比如下面的这一堆数据: 有可能你拿到的是这样的数据,典型的例子就是来自csv文件的数据,格式和下面这种

详解java整合solr5.0之solrj的使用

1.首先导入solrj需要的的架包 2.需要注意的是低版本是solr是使用SolrServer进行URL实例的,5.0之后已经使用SolrClient替代这个类了,在添加之后首先我们需要根据schema.xml配置一下我们的分词器 这里的msg_all还需要在schema.xml中配置 它的主要作用是将msg_title,msg_content两个域的值拷贝到msg_all域中,我们在搜索的时候可以只搜索这个msg_all域就可以了, solr默认搜索需要带上域,比如 solr更改默认搜索域的地

详解spring中使用solr的代码实现

在介绍solr的使用方法之前,我们需要安装solr的服务端集群.基本上就是安装zookeeper,tomcat,jdk,solr,然后按照需要配置三者的配置文件即可.由于本人并没有具体操作过如何进行solr集群的搭建.所以关于如何搭建solr集群,读者可以去网上查看其它资料,有很多可以借鉴.这里只介绍搭建完solr集群之后,我们客户端是如何访问solr集群的. 之前介绍过,spring封装nosql和sql数据库的使用,都是通过xxxTemplate.solr也不例外. 我们需要引入solr的j

solr在java中的使用实例代码

SolrJ是操作Solr的Java客户端,它提供了增加.修改.删除.查询Solr索引的JAVA接口.SolrJ针对 Solr提供了Rest 的HTTP接口进行了封装, SolrJ底层是通过使用httpClient中的方法来完成Solr的操作. jar包的引用(maven pom.xml): <dependency> <groupId>org.apache.solr</groupId> <artifactId>solr-solrj</artifactId

Java中Console对象实例代码

在JDK 6中新增了java.io.Console类,可以让您取得字节为基础的主控台装置,例如,您可以藉由System新增的console()方法取得标准输入输出装置的Console对象,并利用它来执行一些简单的主控台文字输入输出,例如: ConsoleDemo.java import java.io.Console; public class ConsoleDemo { public static void main(String[] args) { System.out.print("请输入

java读取http请求中的body实例代码

在http请求中,有Header和Body之分,读取header使用request.getHeader("..."); 读取Body使用request.getReader(),但getReader获取的是BufferedReader,需要把它转换成字符串, 下面是转换的方法. public static String getBodyString(BufferedReader br) { String inputLine; String str = ""; try {

java 中匿名内部类的实例详解

java 中匿名内部类的实例详解 原来的面貌: class TT extends Test{ void show() { System.out.println(s+"~~~哈哈"); System.out.println("超级女声"); } TT tt=new TT(); tt.show(); 只是说我们这里采用的是匿名的形式来处理. 重写了Test的show()方法,在重写好了以后,又调用了重写后的show()方法 实现代码: package cn.com; c

浅谈JAVA中输入输出流实例详解

java语言的输入输出功能是十分强大而灵活的,美中不足的是看上去输入输出的代码并不是很简洁,因为你往往需要包装许多不同的对象.在Java类库中,IO部分的内容是很庞大的,因为它涉及的领域很广泛:标准输入输出,文件的操作,网络上的数据流,字符串流,对象流,zip文件流....本文的目的是为大家介绍JAVA中输入输出流实例详解. 流的层次结构 定义:        java将读取数据对象成为输入流,能向其写入的对象叫输出流.结构图如下: 1.输入输出: 输入/输出(Input/Output)是指对某

Java Web用户登录实例代码

实现功能: 1.用户登陆.注销 2.利用session记录用户登陆信息 3.在JSP中展示已登陆用户信息 实现原理: 登陆后通过判断用户名和密码是否和存储的一致,如果一致,就把用户信息放到session中储存:如果不一致就提示信息,并且返回登陆页面. 显示信息页面上固定从session中找用户登陆信息,找到就显示用户信息,没找到就显示登陆框. 注销很简单,就是清空session信息. 主要文件: 1.LoginAction:struts2的Action类,用于处理JAVA端的主要登陆和登出逻辑.

Java 冒泡排序、快速排序实例代码

冒泡排序 冒泡排序是一种简单的排序算法.它重复地走访过要排序的数列,一次比较两个元素,如果他们的顺序错误就把他们交换过来.走访数列的工作是重复地 进行直到没有再需要交换,也就是说该数列已经排序完成.这个算法的名字由来是因为越小的元素会经由交换慢慢"浮"到数列的顶端. 冒泡排序的算法实现如下:[排序后,数组从小到大排列] /** * 冒泡排序 * 比较相邻的元素.如果第一个比第二个大,就交换他们两个. * 对每一对相邻元素作同样的工作,从开始第一对到结尾的最后一对.在这一点,最后的元素应

java多线程的同步方法实例代码

java多线程的同步方法实例代码 先看一个段有关银行存钱的代码: class Bank { private int sum; public void add(int num){ sum = sum + num; try { Thread.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("total num is : " + sum); } } class Cu

Java生成验证码功能实例代码

页面上输入验证码是比较常见的一个功能,实现起来也很简单.给大家写一个简单的生成验证码的示例程序,需要的朋友可以借鉴一下. 闲话少续,直接上代码.代码中的注释很详细. package com.SM_test.utils; import java.awt.Color; import java.awt.Font; import java.awt.Graphics; import java.awt.Graphics2D; import java.awt.RenderingHints; import ja

详解java中的四种代码块

在java中用{}括起来的称为代码块,代码块可分为以下四种: 一.简介 1.普通代码块: 类中方法的方法体 2.构造代码块: 构造块会在创建对象时被调用,每次创建时都会被调用,优先于类构造函数执行. 3.静态代码块: 用static{}包裹起来的代码片段,只会执行一次.静态代码块优先于构造块执行. 4.同步代码块: 使用synchronized(){}包裹起来的代码块,在多线程环境下,对共享数据的读写操作是需要互斥进行的,否则会导致数据的不一致性.同步代码块需要写在方法中. 二.静态代码块和构造