Java HTTP协议收发MQ 消息代码实例详解

1. 准备环境

在工程 POM 文件添加 HTTP Java 客户端的依赖。

<dependency>
  <groupId>org.eclipse.jetty</groupId>
  <artifactId>jetty-client</artifactId>
  <version>9.3.4.RC1</version>
 </dependency>
 <dependency>
  <groupId>com.aliyun.openservices</groupId>
  <artifactId>ons-client</artifactId>
  <version>1.1.11</version>
 </dependency>

2. 运行代码配置(user.properties)

您需要设置配置文件(user.properties)的相关内容,具体请参考申请 MQ 资源 。

#您在控制台创建的Topic
Topic=xxx
#公测url
URL=http://publictest-rest.ons.aliyun.com
#阿里云身份验证码
Ak=xxx
#阿里云身份验证密钥
Sk=xxx
#MQ控制台创建的Producer ID
ProducerID=xxx
#MQ控制台创建的Consumer ID
ConsumerID=xxx

说明:URL 中的 Key,Tag以及 POST Content-Type 没有任何的限制,只要确保Key 和 Tag 相同唯一即可,可以放在 user.properties 里面。

3. HTTP 发送消息示例代码

您可以按以下说明设置相应参数并测试 HTTP 消息发送功能。

package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.Properties;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.StringContentProvider;
import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
public class HttpProducer {
  public static String SIGNATURE="Signature";
  public static String NUM="num";
  public static String CONSUMERID="ConsumerID";
  public static String PRODUCERID="ProducerID";
  public static String TIMEOUT="timeout";
  public static String TOPIC="Topic";
  public static String AK="AccessKey";
  public static String BODY="body";
  public static String MSGHANDLE="msgHandle";
  public static String TIME="time";
  public static void main(String[] args) throws Exception {
    HttpClient httpClient=new HttpClient();
    httpClient.setMaxConnectionsPerDestination(1);
    httpClient.start();
    Properties properties=new Properties();
    properties.load(HttpProducer.class.getClassLoader().getResourceAsStream("user.properties"));
    String topic=properties.getProperty("Topic"); //请在user.properties配置您的Topic
    String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
    String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
    String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
    String pid=properties.getProperty("ProducerID");//请在user.properties配置您的Producer ID
    String date=String.valueOf(new Date().getTime());
    String sign=null;
    String body="hello ons http";
    String NEWLINE="\n";
    String signString;
    for (int i = 0; i < 10; i++) {
      date=String.valueOf(new Date().getTime());
      Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&tag=http"+"&key=http");
      ContentProvider content=new StringContentProvider(body);
      req.content(content);
      signString=topic+NEWLINE+pid+NEWLINE+MD5.getInstance().getMD5String(body)+NEWLINE+date;
      System.out.println(signString);
      sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
      req.header(SIGNATURE, sign);
      req.header(AK, ak);
      req.header(PRODUCERID, pid);
      ContentResponse response;
      response=req.send();
      System.out.println("send msg:"+response.getStatus()+response.getContentAsString());
    }
  }
}

4. HTTP接收消息示例代码

请按以下说明设置相应参数并测试 HTTP 消息接收功能。

package com.aliyun.openservice.ons.http.demo;
import java.nio.charset.Charset;
import java.util.Date;
import java.util.List;
import java.util.Properties;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentProvider;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.util.StringContentProvider;
import org.eclipse.jetty.http.HttpMethod;
import com.alibaba.fastjson.JSON;
import com.aliyun.openservice.ons.mqtt.demo.MqttProducer;
import com.aliyun.openservices.ons.api.impl.authority.AuthUtil;
public class HttpConsumer {
  public static String SIGNATURE="Signature";
  public static String NUM="num";
  public static String CONSUMERID="ConsumerID";
  public static String PRODUCERID="ProducerID";
  public static String TIMEOUT="timeout";
  public static String TOPIC="Topic";
  public static String AK="AccessKey";
  public static String BODY="body";
  public static String MSGHANDLE="msgHandle";
  public static String TIME="time";
  public static void main(String[] args) throws Exception {
    HttpClient httpClient=new HttpClient();
    httpClient.setMaxConnectionsPerDestination(1);
    httpClient.start();
    Properties properties=new Properties();
    properties.load(HttpConsumer.class.getClassLoader().getResourceAsStream("user.properties"));
    String topic=properties.getProperty("Topic"); //请在user.properties配置您的topic
    String url=properties.getProperty("URL");//公测集群配置为http://publictest-rest.ons.aliyun.com/
    String ak=properties.getProperty("Ak");//请在user.properties配置您的Ak
    String sk=properties.getProperty("Sk");//请在user.properties配置您的Sk
    String cid=properties.getProperty("ConsumerID");//请在user.properties配置您的Consumer ID
    String date=String.valueOf(new Date().getTime());
    String sign=null;
    String NEWLINE="\n";
    String signString;
    System.out.println(NEWLINE+NEWLINE);
    while (true) {
      try {
        date=String.valueOf(new Date().getTime());
        Request req=httpClient.POST(url+"message/?topic="+topic+"&time="+date+"&num="+32);
        req.method(HttpMethod.GET);
        ContentResponse response;
        signString=topic+NEWLINE+cid+NEWLINE+date;
        sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
        req.header(SIGNATURE, sign);
        req.header(AK, ak);
        req.header(CONSUMERID, cid);
        long start=System.currentTimeMillis();
        response=req.send();
        System.out.println("get cost:"+(System.currentTimeMillis()-start)/1000
                  +"  "+response.getStatus()+"  "+response.getContentAsString());
        List<SimpleMessage> list = null;
        if (response.getContentAsString()!=null&&!response.getContentAsString().isEmpty()) {
           list=JSON.parseArray(response.getContentAsString(), SimpleMessage.class);
        }
        if (list==null||list.size()==0) {
          Thread.sleep(100);
          continue;
        }
        System.out.println("size is :"+list.size());
        for (SimpleMessage simpleMessage : list) {
          date=String.valueOf(new Date().getTime());
          System.out.println("receive msg:"+simpleMessage.getBody()+"  born time "+simpleMessage.getBornTime());
          req=httpClient.POST(url+"message/?msgHandle="+simpleMessage.getMsgHandle()+"&topic="+topic+"&time="+date);
          req.method(HttpMethod.DELETE);
          signString=topic+NEWLINE+cid+NEWLINE+simpleMessage.getMsgHandle()+NEWLINE+date;
          sign=AuthUtil.calSignature(signString.getBytes(Charset.forName("UTF-8")), sk);
          req.header(SIGNATURE, sign);
          req.header(AK, ak);
          req.header(CONSUMERID, cid);
          response=req.send();
          System.out.println("delete msg:"+response.toString());
        }
        Thread.sleep(100);
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
  }
}

5. HTTP示例程序工具类

(1)消息封装类: SimpleMessage.java

package com.aliyun.openservice.ons.http.demo;
public class SimpleMessage {
  private String body;
  private String msgId;
  private String bornTime;
  private String msgHandle;
  private int reconsumeTimes;
  private String tag;
  public void setTag(String tag) {
    this.tag = tag;
  }
  public String getTag() {
    return tag;
  }
  public int getReconsumeTimes() {
    return reconsumeTimes;
  }
  public void setReconsumeTimes(int reconsumeTimes) {
    this.reconsumeTimes = reconsumeTimes;
  }
  public void setMsgHandle(String msgHandle) {
    this.msgHandle = msgHandle;
  }
  public String getMsgHandle() {
    return msgHandle;
  }
  public String getBody() {
    return body;
  }
  public void setBody(String body) {
    this.body = body;
  }
  public String getMsgId() {
    return msgId;
  }
  public void setMsgId(String msgId) {
    this.msgId = msgId;
  }
  public String getBornTime() {
    return bornTime;
  }
  public void setBornTime(String bornTime) {
    this.bornTime = bornTime;
  }
}

(2)字符串签名类: MD5.java

package com.aliyun.openservice.ons.http.demo;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.security.MessageDigest;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantLock;
import org.slf4j.LoggerFactory;
public class MD5 {
  private static final org.slf4j.Logger log = LoggerFactory.getLogger(MD5.class);
  private static char[] digits = { '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f' };
  private static Map<Character, Integer> rDigits = new HashMap<Character, Integer>(16);
  static {
    for (int i = 0; i < digits.length; ++i) {
      rDigits.put(digits[i], i);
    }
  }
  private static MD5 me = new MD5();
  private MessageDigest mHasher;
  private final ReentrantLock opLock = new ReentrantLock();
  private MD5() {
    try {
      this.mHasher = MessageDigest.getInstance("md5");
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
  public static MD5 getInstance() {
    return me;
  }
  public String getMD5String(String content) {
    return this.bytes2string(this.hash(content));
  }
  public String getMD5String(byte[] content) {
    return this.bytes2string(this.hash(content));
  }
  public byte[] getMD5Bytes(byte[] content) {
    return this.hash(content);
  }
  public byte[] hash(String str) {
    this.opLock.lock();
    try {
      byte[] bt = this.mHasher.digest(str.getBytes("utf-8"));
      if (null == bt || bt.length != 16) {
        throw new IllegalArgumentException("md5 need");
      }
      return bt;
    } catch (UnsupportedEncodingException e) {
      throw new RuntimeException("unsupported utf-8 encoding", e);
    } finally {
      this.opLock.unlock();
    }
  }
  public byte[] hash(byte[] data) {
    this.opLock.lock();
    try {
      byte[] bt = this.mHasher.digest(data);
      if (null == bt || bt.length != 16) {
        throw new IllegalArgumentException("md5 need");
      }
      return bt;
    } finally {
      this.opLock.unlock();
    }
  }
  public String bytes2string(byte[] bt) {
    int l = bt.length;
    char[] out = new char[l << 1];
    for (int i = 0, j = 0; i < l; i++) {
      out[j++] = digits[(0xF0 & bt[i]) >>> 4];
      out[j++] = digits[0x0F & bt[i]];
    }
    if (log.isDebugEnabled()) {
      log.debug("[hash]" + new String(out));
    }
    return new String(out);
  }
  public byte[] string2bytes(String str) {
    if (null == str) {
      throw new NullPointerException("Argument is not allowed empty");
    }
    if (str.length() != 32) {
      throw new IllegalArgumentException("String length must equals 32");
    }
    byte[] data = new byte[16];
    char[] chs = str.toCharArray();
    for (int i = 0; i < 16; ++i) {
      int h = rDigits.get(chs[i * 2]).intValue();
      int l = rDigits.get(chs[i * 2 + 1]).intValue();
      data[i] = (byte) ((h & 0x0F) << 4 | l & 0x0F);
    }
    return data;
  }
}

希望本篇文章对您有所帮助

(0)

相关推荐

  • HTTP协议简介_动力节点Java学院整理

    TCP协议对应于传输层,而HTTP协议对应于应用层,从本质上来说,二者没有可比性.Http协议是建立在TCP协议基础之上的,当浏览器需要从服务器获取网页数据的时候,会发出一次Http请求.Http会通过TCP建立起一个到服务器的连接通道,当本次请求需要的数据完毕后,Http会立即将TCP连接断开,这个过程是很短的.所以Http连接是一种短连接,是一种无状态的连接.所谓的无状态,是指浏览器每次向服务器发起请求的时候,不是通过一个连接,而是每次都建立一个新的连接.如果是一个连接的话,服务器进程中就能

  • javaweb中Http协议详解

    一.什么是HTTP协议 HTTP是hypertext transfer protocol(超文本传输协议)的简写,它是TCP/IP协议的一个应用层协议,用于定义WEB浏览器与WEB服务器之间交换数据的过程.客户端连上web服务器后,若想获得web服务器中的某个web资源,需遵守一定的通讯格式,HTTP协议用于定义客户端与web服务器通迅的格式. 二.HTTP协议的版本 HTTP协议的版本:HTTP/1.0.HTTP/1.1 三.HTTP1.0和HTTP1.1的区别 在HTTP1.0协议中,客户端

  • HTTP协议详解_动力节点Java学院整理

    一.概念 协议是指计算机通信网络中两台计算机之间进行通信所必须共同遵守的规定或规则,超文本传输协议(HTTP)是一种通信协议,它允许将超文本标记语言(HTML)文档从Web服务器传送到客户端的浏览器. HTTP协议,即超文本传输协议(Hypertext transfer protocol).是一种详细规定了浏览器和万维网(WWW = World Wide Web)服务器之间互相通信的规则,通过因特网传送万维网文档的数据传送协议. HTTP协议是用于从WWW服务器传输超文本到本地浏览器的传送协议.

  • HTTP协议入门_动力节点Java学院整理

    HTTP 协议是互联网的基础协议,也是网页开发的必备知识,最新版本 HTTP/2 更是让它成为技术热点. 本文介绍 HTTP 协议的历史演变和设计思路. 一.HTTP/0.9 HTTP 是基于 TCP/IP 协议的应用层协议.它不涉及数据包(packet)传输,主要规定了客户端和服务器之间的通信格式,默认使用80端口. 最早版本是1991年发布的0.9版.该版本极其简单,只有一个命令GET. GET /index.html 上面命令表示,TCP 连接(connection)建立后,客户端向服务器

  • 基于JAVA中Jersey处理Http协议中的Multipart的详解

    那么Http协议中的Multipart是个什么东东?下面是摘抄http协议1.1的一段话:在multipart entity(多部分实体)的例子中,一个或多个不同的数据集合并在一个单一的body(体)中,一个"multipart"(多部分)类型 field的(域)必须出现在实体的header(头域).body(体)必须包括一个或多个body part(体部分),每一个位于boundary(边界)定界符线之前,最后一个则跟着一个结束边界定界符线.在它的边界定界符线后,每一个体部分由头域.

  • Java与Http协议的详细介绍

    Java与Http协议的详细介绍 引言      http(超文本传输协议)是一个基于请求与响应模式的.无状态的.应用层的协议,常基于TCP的连接方式.HTTP协议的主要特点是:      1.支持客户/服务器模式.      2.简单快速:客户向服务器请求服务时,只需传送请求方法和路径.由于HTTP协议简单,通信速度很快.      3.灵活:HTTP允许传输任意类型的数据对象.类型由Content-Type加以标记.      4.无连接:即每次连接只处理一个请求,处理完客户的请求,并收到客

  • Java HTTP协议收发MQ 消息代码实例详解

    1. 准备环境 在工程 POM 文件添加 HTTP Java 客户端的依赖. <dependency> <groupId>org.eclipse.jetty</groupId> <artifactId>jetty-client</artifactId> <version>9.3.4.RC1</version> </dependency> <dependency> <groupId>com

  • Java中IO流解析及代码实例详解

    目录 1.IO流 1.流和流的分类 什么是IO流? IO流的分类? java.io包下需要掌握的流有16个: 2.如何使用流 1.输入流(读文件):FileInputStream 2.输出流(写文件):FileOutputStream 3.文件的拷贝 总结 1.IO流 1.流和流的分类 什么是IO流? I:Input (输入) O: Ouput(输出) IO流的分类? 有多种分类方式: 一种方式是按照流的方向进行分类: 以内存作为参照物 往内存中去,叫做输入(Input).或者叫做读(Read)

  • JAVA类变量及类方法代码实例详解

    这篇文章主要介绍了JAVA类变量及类方法代码实例详解,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 类变量(static) 类变量是该类的所有对象共享的变量,任何一个该类的对象去访问它时,取到的都是相同的值,同样任何一个该类的对象去修改它时,修改的也是同一个变量. public class C { public static void main(String[] args){ Child ch1 = new Child(12,"小小"

  • Java 中HttpURLConnection附件上传的实例详解

    Java 中HttpURLConnection附件上传的实例详解 整合了一个自己写的采用Http做附件上传的工具,分享一下! 示例代码: /** * 以Http协议传输文件 * * @author mingxue.zhang@163.com * */ public class HttpPostUtil { private final static char[] MULTIPART_CHARS = "-_1234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJK

  • Java中常见死锁与活锁的实例详解

    本文介绍了Java中常见死锁与活锁的实例详解,分享给大家,具体如下: 顺序死锁:过度加锁,导致由于执行顺序的原因,互相持有对方正在等待的锁 资源死锁:多个线程在相同的资源上发生等待 由于调用顺序而产生的死锁 public class Test { Object leftLock = new Object(); Object rightLock = new Object(); public static void main(String[] args) { final Test test = ne

  • java 实现websocket的两种方式实例详解

    一.介绍 1.两种方式,一种使用tomcat的websocket实现,一种使用spring的websocket 2.tomcat的方式需要tomcat 7.x,JEE7的支持. 3.spring与websocket整合需要spring 4.x,并且使用了socketjs,对不支持websocket的浏览器可以模拟websocket使用 二.方式一:tomcat 使用这种方式无需别的任何配置,只需服务端一个处理类, 服务器端代码 package com.Socket; import java.io

  • Android开心消消乐代码实例详解

    突然想要在android上写一个消消乐的代码,在此之前没有系统地学过java的面向对象,也没有任何android相关知识,不过还是会一点C++.8月初开始搭建环境,在这上面花了相当多的时间,然后看了一些视频和电子书,对android有了一个大概的了解,感觉差不多了的时候就开始写了. 疯狂地查阅各种资料,反反复复了好几天后,也算是写出了个成品.原计划有很多地方还是可以继续写下去的,比如UI设计,比如动画特效,时间设计,关卡设计,以及与数据库的连接,如果可以的话还能写个联网功能,当然因为写到后期内心

  • PHP队列场景以及实现代码实例详解

    为了降低单点压力,通常会根据业务情况进行分表分库,将表分布在不同的库中(库可能分布在不同的机器上),但是一个业务场景可能会同时处理两个表的操作.在这种场景下,事务的提交会变得相对复杂,因为多个节点(库)的存在,可能存在部分节点提交失败的情况,即事务的ACID特性需要在各个不同的数据库实例中保证.比如更新db1库的A表时,必须同步更新db2库的B表,两个更新形成一个事务,要么都成功,要么都失败. 那么我们如何利用mysql实现分布式数据库的事务呢? mysql是从5.0开始支持分布式事务 这里先声

  • FasfDFS整合Java实现文件上传下载功能实例详解

    在上篇文章给大家介绍了FastDFS安装和配置整合Nginx-1.13.3的方法,大家可以点击查看下. 今天使用Java代码实现文件的上传和下载.对此作者提供了Java API支持,下载fastdfs-client-java将源码添加到项目中.或者在Maven项目pom.xml文件中添加依赖 <dependency> <groupId>org.csource</groupId> <artifactId>fastdfs-client-java</arti

  • java 中Excel转shape file的实例详解

    java  中Excel转shape file的实例详解 概述: 本文讲述如何结合geotools和POI实现Excel到shp的转换,再结合前文shp到geojson数据的转换,即可实现用户上传excel数据并在web端的展示功能. 截图: 原始Excel文件 运行耗时 运行结果 代码: package com.lzugis.geotools; import com.lzugis.CommonMethod; import com.vividsolutions.jts.geom.Coordina

随机推荐