1 package com.eviware.soapui.impl.wsdl.submit.transports.jms;
2
3 import hermes.Hermes;
4
5 import java.io.BufferedReader;
6 import java.io.File;
7 import java.io.FileOutputStream;
8 import java.io.IOException;
9 import java.io.InputStream;
10 import java.io.InputStreamReader;
11 import java.io.OutputStream;
12 import java.util.ArrayList;
13 import java.util.Calendar;
14 import java.util.List;
15
16 import javax.jms.BytesMessage;
17 import javax.jms.Connection;
18 import javax.jms.JMSException;
19 import javax.jms.MapMessage;
20 import javax.jms.Message;
21 import javax.jms.MessageConsumer;
22 import javax.jms.MessageProducer;
23 import javax.jms.Queue;
24 import javax.jms.Session;
25 import javax.jms.TextMessage;
26 import javax.jms.Topic;
27 import javax.naming.NamingException;
28
29 import org.apache.commons.io.output.ByteArrayOutputStream;
30 import org.apache.commons.lang.NotImplementedException;
31
32 import com.eviware.soapui.SoapUI;
33 import com.eviware.soapui.impl.rest.RestRequest;
34 import com.eviware.soapui.impl.support.AbstractHttpRequest;
35 import com.eviware.soapui.impl.wsdl.WsdlProject;
36 import com.eviware.soapui.impl.wsdl.WsdlRequest;
37 import com.eviware.soapui.impl.wsdl.submit.RequestFilter;
38 import com.eviware.soapui.impl.wsdl.submit.RequestTransport;
39 import com.eviware.soapui.impl.wsdl.submit.RequestTransportRegistry.CannotResolveJmsTypeException;
40 import com.eviware.soapui.impl.wsdl.submit.RequestTransportRegistry.MissingTransportException;
41 import com.eviware.soapui.impl.wsdl.submit.transports.jms.util.HermesUtils;
42 import com.eviware.soapui.impl.wsdl.submit.transports.jms.util.JMSUtils;
43 import com.eviware.soapui.impl.wsdl.support.RequestFileAttachment;
44 import com.eviware.soapui.impl.wsdl.support.jms.header.JMSHeaderConfig;
45 import com.eviware.soapui.impl.wsdl.teststeps.HttpTestRequest;
46 import com.eviware.soapui.model.iface.Attachment;
47 import com.eviware.soapui.model.iface.Request;
48 import com.eviware.soapui.model.iface.Response;
49 import com.eviware.soapui.model.iface.SubmitContext;
50 import com.eviware.soapui.model.propertyexpansion.PropertyExpander;
51 import com.eviware.soapui.model.support.ModelSupport;
52
53 public class HermesJmsRequestTransport implements RequestTransport
54 {
55
56 public static final String JMS_MESSAGE_RECEIVE = "JMS_MESSAGE_RECEIVE";
57 public static final String JMS_MESSAGE_SEND = "JMS_MESSAGE_SEND";
58 public static final String JMS_RESPONSE = "JMS_RESPONSE";
59 public static final String JMS_ERROR = "JMS_ERROR";
60 public static final String JMS_RECEIVE_TIMEOUT = "JMS_RECEIVE_TIMEOUT";
61
62 protected String username;
63 protected String password;
64 protected JMSEndpoint jmsEndpoint;
65 protected String durableSubscriptionName;
66 protected String clientID;
67 protected String messageSelector;
68 protected Hermes hermes;
69 protected List<RequestFilter> filters = new ArrayList<RequestFilter>();
70
71 public void abortRequest( SubmitContext submitContext )
72 {
73 }
74
75 public void addRequestFilter( RequestFilter filter )
76 {
77 filters.add( filter );
78 }
79
80 public void removeRequestFilter( RequestFilter filter )
81 {
82 filters.remove( filter );
83 }
84
85 public Response sendRequest( SubmitContext submitContext, Request request ) throws Exception
86 {
87 long timeStarted = Calendar.getInstance().getTimeInMillis();
88 submitContext.setProperty( JMS_RECEIVE_TIMEOUT, getTimeout( submitContext, request ) );
89
90 return resolveType( submitContext, request ).execute( submitContext, request, timeStarted );
91 }
92
93 protected void init( SubmitContext submitContext, Request request ) throws NamingException
94 {
95 this.jmsEndpoint = new JMSEndpoint( request, submitContext );
96 this.hermes = getHermes( jmsEndpoint.getSessionName(), request );
97 this.username = submitContext.expand( request.getUsername() );
98 this.password = submitContext.expand( request.getPassword() );
99 JMSHeaderConfig jmsConfig = ( ( AbstractHttpRequest<?> )request ).getJMSHeaderConfig();
100 this.durableSubscriptionName = submitContext.expand(jmsConfig.getDurableSubscriptionName());
101 this.clientID = submitContext.expand(jmsConfig.getClientID());
102 this.messageSelector = submitContext.expand(jmsConfig.getMessageSelector());
103
104
105 }
106
107 protected Response execute( SubmitContext submitContext, Request request, long timeStarted ) throws Exception
108 {
109 throw new NotImplementedException();
110 }
111
112 private HermesJmsRequestTransport resolveType( SubmitContext submitContext, Request request )
113 throws CannotResolveJmsTypeException, MissingTransportException
114 {
115 int ix = request.getEndpoint().indexOf( "://" );
116 if( ix == -1 )
117 throw new MissingTransportException( "Missing protocol in endpoint [" + request.getEndpoint() + "]" );
118
119 String[] params = JMSEndpoint.extractEndpointParameters( request );
120
121
122 if( params.length == 2 )
123 {
124 String destinationName = PropertyExpander.expandProperties( submitContext, params[1] );
125 if( destinationName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX ) )
126 {
127 return new HermesJmsRequestSendTransport();
128 }
129 else if( destinationName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX ) )
130 {
131 return new HermesJmsRequestPublishTransport();
132 }
133 else
134 {
135 cannotResolve();
136 }
137
138 }
139
140 else if( params.length == 3 && PropertyExpander.expandProperties( submitContext, params[1] ).equals( "-" ) )
141 {
142 String destinationName = PropertyExpander.expandProperties( submitContext, params[2] );
143 if( destinationName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX ) )
144 {
145 return new HermesJmsRequestReceiveTransport();
146 }
147 else if( destinationName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX ) )
148 {
149 return new HermesJmsRequestSubscribeTransport();
150 }
151 else
152 {
153 cannotResolve();
154 }
155 }
156
157 else if( params.length == 3 )
158 {
159 String destinationSendName = PropertyExpander.expandProperties( submitContext, params[1] );
160 String destinationReceiveName = PropertyExpander.expandProperties( submitContext, params[2] );
161 if( destinationSendName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX )
162 && destinationReceiveName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX ) )
163 {
164 return new HermesJmsRequestSendReceiveTransport();
165 }
166 else if( destinationSendName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX )
167 && destinationReceiveName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX ) )
168 {
169 return new HermesJmsRequestSendSubscribeTransport();
170 }
171 else if( destinationSendName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX )
172 && destinationReceiveName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX ) )
173 {
174 return new HermesJmsRequestPublishSubscribeTransport();
175 }
176 else if( destinationSendName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX )
177 && destinationReceiveName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX ) )
178 {
179 return new HermesJmsRequestPublishReceiveTransport();
180 }
181 else
182 {
183 cannotResolve();
184 }
185 }
186 else
187 {
188 cannotResolve();
189 }
190 return null;
191 }
192
193 private static void cannotResolve() throws CannotResolveJmsTypeException
194 {
195 throw new CannotResolveJmsTypeException(
196 "\nBad jms alias! \nFor JMS please use this endpont pattern:\nfor sending 'jms://sessionName/queue_myqueuename' \nfor receive 'jms://sessionName/-/queue_myqueuename'\nfor send-receive 'jms://sessionName/queue_myqueuename1/queue_myqueuename2'" );
197 }
198
199 protected Hermes getHermes( String sessionName, Request request ) throws NamingException
200 {
201 WsdlProject project = ( WsdlProject )ModelSupport.getModelItemProject( request );
202 return HermesUtils.getHermes( project, sessionName );
203 }
204
205 protected long getTimeout( SubmitContext submitContext, Request request )
206 {
207 String timeout = PropertyExpander.expandProperties( submitContext, request.getTimeout() );
208 long to = 0;
209 try
210 {
211 to = Long.parseLong( timeout );
212 }
213 catch( Exception e )
214 {
215 }
216
217 return to;
218 }
219
220 protected JMSHeader createJMSHeader( SubmitContext submitContext, Request request, Hermes hermes, Message message )
221 {
222 JMSHeader jmsHeader = new JMSHeader();
223 jmsHeader.setMessageHeaders( message, request, hermes, submitContext );
224 JMSHeader.setMessageProperties( message, request, hermes, submitContext );
225 return jmsHeader;
226 }
227
228 protected void closeSessionAndConnection( Connection connection, Session session ) throws JMSException
229 {
230 if( session != null )
231 session.close();
232 if( connection != null )
233 connection.close();
234 }
235
236 protected Response errorResponse( SubmitContext submitContext, Request request, long timeStarted, JMSException jmse )
237 {
238 JMSResponse response;
239 SoapUI.logError( jmse );
240 submitContext.setProperty( JMS_ERROR, jmse );
241 response = new JMSResponse( "", null, null, request, timeStarted );
242 submitContext.setProperty( JMS_RESPONSE, response );
243 return response;
244 }
245
246 protected Message messageSend( SubmitContext submitContext, Request request, Session session, Hermes hermes,
247 Queue queueSend ) throws JMSException
248 {
249 MessageProducer messageProducer = session.createProducer( queueSend );
250 Message messageSend = createMessage( submitContext, request, session );
251 return send( submitContext, request, hermes, messageProducer, messageSend );
252 }
253
254 protected Message messagePublish( SubmitContext submitContext, Request request, Session topicSession,
255 Hermes hermes, Topic topicPublish ) throws JMSException
256 {
257 MessageProducer topicPublisher = topicSession.createProducer( topicPublish );
258 Message messagePublish = createMessage( submitContext, request, topicSession );
259 return send( submitContext, request, hermes, topicPublisher, messagePublish );
260 }
261
262 private Message send( SubmitContext submitContext, Request request, Hermes hermes, MessageProducer messageProducer,
263 Message message ) throws JMSException
264 {
265 JMSHeader jmsHeader = createJMSHeader( submitContext, request, hermes, message );
266 messageProducer.send( message, message.getJMSDeliveryMode(), message.getJMSPriority(), jmsHeader.getTimeTolive() );
267 submitContext.setProperty( JMS_MESSAGE_SEND, message );
268 return message;
269 }
270
271 protected Response makeResponse( SubmitContext submitContext, Request request, long timeStarted,
272 Message messageSend, MessageConsumer messageConsumer ) throws JMSException
273 {
274 long timeout = getTimeout( submitContext, request );
275 Message messageReceive = messageConsumer.receive( timeout );
276 if( messageReceive != null )
277 {
278 JMSResponse response = resolveMessage( request, timeStarted, messageSend, messageReceive );
279 submitContext.setProperty( JMS_MESSAGE_RECEIVE, messageReceive );
280 submitContext.setProperty( JMS_RESPONSE, response );
281 return response;
282 }
283 else
284 {
285 return new JMSResponse( "", null, null, request, timeStarted );
286 }
287 }
288
289 private JMSResponse resolveMessage( Request request, long timeStarted, Message messageSend, Message messageReceive )
290 throws JMSException
291 {
292 if( messageReceive instanceof TextMessage )
293 {
294 TextMessage textMessageReceive = ( TextMessage )messageReceive;
295 return new JMSResponse( textMessageReceive.getText(), messageSend, textMessageReceive, request, timeStarted );
296 }
297 else if( messageReceive instanceof MapMessage )
298 {
299 MapMessage mapMessageReceive = ( MapMessage )messageReceive;
300 return new JMSResponse( JMSUtils.extractMapMessagePayloadToString( mapMessageReceive ), messageSend,
301 mapMessageReceive, request, timeStarted );
302 }
303 else if( messageReceive instanceof BytesMessage )
304 {
305 BytesMessage bytesMessageReceive = ( BytesMessage )messageReceive;
306 JMSResponse jmsResponse = new JMSResponse( "", messageSend, bytesMessageReceive, request, timeStarted );
307 addAttachment( request, bytesMessageReceive, jmsResponse );
308 return jmsResponse;
309 }
310 return null;
311 }
312
313 protected Response makeEmptyResponse( SubmitContext submitContext, Request request, long timeStarted,
314 Message messageSend )
315 {
316 JMSResponse response = new JMSResponse( "", messageSend, null, request, timeStarted );
317 submitContext.setProperty( JMS_RESPONSE, response );
318 return response;
319 }
320
321 private Message createMessage( SubmitContext submitContext, Request request, Session session ) throws JMSException
322 {
323 if( request instanceof WsdlRequest || request instanceof HttpTestRequest || request instanceof RestRequest )
324 {
325 if( hasAttachment( request ) )
326 {
327 if( isTextAttachment( request ) )
328 {
329 return createTextMessageFromAttachment( submitContext, request, session );
330 }
331 else
332 {
333 return createBytesMessage( request, session );
334 }
335 }
336 else
337 {
338 return createTextMessage( submitContext, request, session );
339 }
340 }
341
342 return null;
343 }
344
345 private Message createTextMessageFromAttachment( SubmitContext submitContext, Request request, Session session )
346 {
347 try
348 {
349 String content = convertStreamToString( request.getAttachments()[0].getInputStream() );
350 TextMessage textMessageSend = session.createTextMessage();
351 String messageBody = PropertyExpander.expandProperties( submitContext, content );
352 textMessageSend.setText( messageBody );
353 return textMessageSend;
354 }
355 catch( Exception e )
356 {
357 SoapUI.logError( e );
358 }
359 return null;
360 }
361
362 private String convertStreamToString( InputStream is )
363 {
364 BufferedReader reader = new BufferedReader( new InputStreamReader( is ) );
365 StringBuilder sb = new StringBuilder();
366
367 String line = null;
368 try
369 {
370 while( ( line = reader.readLine() ) != null )
371 {
372 sb.append( line + "\n" );
373 }
374 }
375 catch( IOException e )
376 {
377 e.printStackTrace();
378 }
379 finally
380 {
381 try
382 {
383 is.close();
384 }
385 catch( IOException e )
386 {
387 e.printStackTrace();
388 }
389 }
390 return sb.toString();
391 }
392
393 private boolean hasAttachment( Request request )
394 {
395 if( request.getAttachments().length > 0 )
396 return true;
397 return false;
398 }
399
400 private Message createTextMessage( SubmitContext submitContext, Request request, Session session )
401 throws JMSException
402 {
403 TextMessage textMessageSend = session.createTextMessage();
404 String messageBody = PropertyExpander.expandProperties( submitContext, request.getRequestContent() );
405 textMessageSend.setText( messageBody );
406 return textMessageSend;
407 }
408
409 private boolean isTextAttachment( Request request )
410 {
411 if( request.getAttachments().length > 0
412 && ( request.getAttachments()[0].getContentType().contains( "/text" )
413 || request.getAttachments()[0].getContentType().contains( "/xml" ) || request.getAttachments()[0]
414 .getContentType().contains( "text/plain" ) ) )
415 {
416 return true;
417 }
418 return false;
419 }
420
421 private Message createBytesMessage( Request request, Session session )
422 {
423 try
424 {
425 InputStream in = request.getAttachments()[0].getInputStream();
426 int buff = -1;
427 ByteArrayOutputStream baos = new ByteArrayOutputStream();
428 while( ( buff = in.read() ) != -1 )
429 {
430 baos.write( buff );
431 }
432 BytesMessage bytesMessage = session.createBytesMessage();
433 bytesMessage.writeBytes( baos.toByteArray() );
434 return bytesMessage;
435 }
436 catch( Exception e )
437 {
438 SoapUI.logError( e );
439 }
440 return null;
441 }
442
443 private void addAttachment( Request request, BytesMessage bytesMessageReceive, JMSResponse jmsResponse )
444 throws JMSException
445 {
446 try
447 {
448 byte[] buff = new byte[1];
449 File temp = File.createTempFile( "bytesmessage", ".tmp" );
450 OutputStream out = new FileOutputStream( temp );
451 while( bytesMessageReceive.readBytes( buff ) != -1 )
452 {
453 out.write( buff );
454 }
455 out.close();
456 Attachment[] attachments = new Attachment[] { new RequestFileAttachment( temp, false,
457 ( AbstractHttpRequest<?> )request ) };
458 jmsResponse.setAttachments( attachments );
459 }
460 catch( IOException e )
461 {
462 SoapUI.logError( e );
463 }
464 }
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503 @SuppressWarnings( "serial" )
504 public static class UnresolvedJMSEndpointException extends Exception
505 {
506 public UnresolvedJMSEndpointException( String msg )
507 {
508 super( msg );
509 }
510 }
511
512 }