记录一次冷门技术oracle aq的使用
版本
oracle 11g
创建用户
-- 创建用户
create user testaq identified by 123456;
grant connect, resource to testaq;
-- 创建aq所需要的权限
grant execute on dbms_aq to testaq;
grant execute on dbms_aqadm to testaq;
begin
dbms_aqadm.grant_system_privilege('enqueue_any', 'testaq', false);
dbms_aqadm.grant_system_privilege('dequeue_any', 'testaq', false);
end;
grant execute on dbms_aq to testaq;
grant resource to testaq;
grant connect to testaq;
grant execute any procedure to testaq;
grant aq_administrator_role to testaq;
grant aq_user_role to testaq;
grant execute on dbms_aqadm to testaq;
grant execute on dbms_aq to testaq;
grant execute on dbms_aqin to testaq;
grant create procedure to testaq;
grant create procedure to testaq with admin option;
创建列队表
begin
dbms_aqadm.create_queue_table(
queue_table => 'testaq.xml_queue_table',
queue_payload_type => 'SYS.XMLTYPE',
multiple_consumers => false
);
end;
创建列队及启动队列
begin
dbms_aqadm.create_queue (
queue_name => 'testaq.xml_queue',
queue_table => 'testaq.xml_queue_table'
);
dbms_aqadm.start_queue(
queue_name => 'testaq.xml_queue'
);
end;
停止及删除队列
begin
dbms_aqadm.stop_queue (queue_name => 'testaq.xml_queue');
dbms_aqadm.drop_queue (queue_name => 'testaq.xml_queue');
dbms_aqadm.drop_queue_table (queue_table => 'testaq.xml_queue_table');
end;
发送消息
declare
r_enqueue_options DBMS_AQ.ENQUEUE_OPTIONS_T;
r_message_properties DBMS_AQ.MESSAGE_PROPERTIES_T;
v_message_handle RAW(16);
o_payload testaq.test_queue_type;
begin
o_payload := testaq.test_queue_type('<ROOT><ROWSET><ROW><APPLYNO>test</APPLYNO></ROW></ROWSET></ROOT>');
dbms_aq.enqueue(
queue_name => 'testaq.test_queue',
enqueue_options => r_enqueue_options,
message_properties => r_message_properties,
payload => o_payload,
msgid => v_message_handle
);
commit;
end;
Java接收消息
oracle-aq:
jdbcUrl: jdbc:oracle:thin:@localhost:1521:testaq
username: testaq
password: 123456
queueNameUser: testaq
queueName: xml_queue
@Component
@ConfigurationProperties(prefix = "oracle-aq")
@Data
public class OracleAqJmsConfig {
private String jdbcUrl;
private String username;
private String password;
private String queueNameUser;
private String queueName;
}
import lombok.extern.slf4j.Slf4j;
import oracle.jms.AQjmsAdtMessage;
import oracle.jms.AQjmsDestination;
import oracle.jms.AQjmsFactory;
import oracle.jms.AQjmsSession;
import oracle.xdb.XMLType;
import oracle.xdb.XMLTypeFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import javax.annotation.PostConstruct;
import javax.jms.*;
import javax.xml.bind.JAXBException;
import java.util.Properties;
@Service
@Slf4j
public class TestOracleAq {
@Autowired
private OracleAqJmsConfig config;
@PostConstruct
public void messageListener() throws JMSException {
QueueConnectionFactory queueConnectionFactory = AQjmsFactory.getQueueConnectionFactory(config.getJdbcUrl(), new Properties());
QueueConnection conn = queueConnectionFactory.createQueueConnection(config.getUsername(), config.getPassword());
AQjmsSession session = (AQjmsSession)conn.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
conn.start();
Queue queue = (AQjmsDestination)session.getQueue(config.getQueueNameUser(), config.getQueueName());
XMLTypeFactory factory = new XMLTypeFactory();
MessageConsumer consumer = session.createConsumer(queue, null, factory, null, false);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
AQjmsAdtMessage adtMessage = (AQjmsAdtMessage) message;
try {
Object adtPayload = adtMessage.getAdtPayload();
XMLType xmlType = (XMLType)adtPayload;
saveXml(xmlType.getStringVal());
log.info("接收到oracle aq数据:{}", xmlType.getStringVal());
} catch (Exception e) {
log.error("", e);
}
}
});
}
public void saveXml(String xml) throws JAXBException {
// todo ...
}
}
依赖
在oracle安装目录中查找这些依赖
<!-- oracle aq -->
<dependency>
<groupId>com.oracle</groupId>
<artifactId>ojdbc6</artifactId>
<version>11.1.0.7.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/ojdbc6.jar</systemPath>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>jmscommon</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/jmscommon.jar</systemPath>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>orai18n</artifactId>
<version>11.1.0.7.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/orai18n.jar</systemPath>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>jta</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/jta.jar</systemPath>
</dependency>
<dependency>
<groupId>com.oracle</groupId>
<artifactId>aqapi_g</artifactId>
<version>1.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/aqapi_g.jar</systemPath>
</dependency>
<dependency>
<groupId>oracle.xdb</groupId>
<artifactId>xdb</artifactId>
<version>21.9.0.0</version>
<scope>system</scope>
<systemPath>${project.basedir}/libs/xdb.jar</systemPath>
</dependency>
<!-- oracle aq -->