View Javadoc

1   package com.eviware.soapui.impl.wsdl.submit.transports.jms;
2   
3   import hermes.Hermes;
4   
5   import java.io.BufferedReader;
6   import java.io.File;
7   import java.io.FileOutputStream;
8   import java.io.IOException;
9   import java.io.InputStream;
10  import java.io.InputStreamReader;
11  import java.io.OutputStream;
12  import java.util.ArrayList;
13  import java.util.Calendar;
14  import java.util.List;
15  
16  import javax.jms.BytesMessage;
17  import javax.jms.Connection;
18  import javax.jms.JMSException;
19  import javax.jms.MapMessage;
20  import javax.jms.Message;
21  import javax.jms.MessageConsumer;
22  import javax.jms.MessageProducer;
23  import javax.jms.Queue;
24  import javax.jms.Session;
25  import javax.jms.TextMessage;
26  import javax.jms.Topic;
27  import javax.naming.NamingException;
28  
29  import org.apache.commons.io.output.ByteArrayOutputStream;
30  import org.apache.commons.lang.NotImplementedException;
31  
32  import com.eviware.soapui.SoapUI;
33  import com.eviware.soapui.impl.rest.RestRequest;
34  import com.eviware.soapui.impl.support.AbstractHttpRequest;
35  import com.eviware.soapui.impl.wsdl.WsdlProject;
36  import com.eviware.soapui.impl.wsdl.WsdlRequest;
37  import com.eviware.soapui.impl.wsdl.submit.RequestFilter;
38  import com.eviware.soapui.impl.wsdl.submit.RequestTransport;
39  import com.eviware.soapui.impl.wsdl.submit.RequestTransportRegistry.CannotResolveJmsTypeException;
40  import com.eviware.soapui.impl.wsdl.submit.RequestTransportRegistry.MissingTransportException;
41  import com.eviware.soapui.impl.wsdl.submit.transports.jms.util.HermesUtils;
42  import com.eviware.soapui.impl.wsdl.submit.transports.jms.util.JMSUtils;
43  import com.eviware.soapui.impl.wsdl.support.RequestFileAttachment;
44  import com.eviware.soapui.impl.wsdl.support.jms.header.JMSHeaderConfig;
45  import com.eviware.soapui.impl.wsdl.teststeps.HttpTestRequest;
46  import com.eviware.soapui.model.iface.Attachment;
47  import com.eviware.soapui.model.iface.Request;
48  import com.eviware.soapui.model.iface.Response;
49  import com.eviware.soapui.model.iface.SubmitContext;
50  import com.eviware.soapui.model.propertyexpansion.PropertyExpander;
51  import com.eviware.soapui.model.support.ModelSupport;
52  
53  public class HermesJmsRequestTransport implements RequestTransport
54  {
55  
56  	public static final String JMS_MESSAGE_RECEIVE = "JMS_MESSAGE_RECEIVE";
57  	public static final String JMS_MESSAGE_SEND = "JMS_MESSAGE_SEND";
58  	public static final String JMS_RESPONSE = "JMS_RESPONSE";
59  	public static final String JMS_ERROR = "JMS_ERROR";
60  	public static final String JMS_RECEIVE_TIMEOUT = "JMS_RECEIVE_TIMEOUT";
61  
62  	protected String username;
63  	protected String password;
64  	protected JMSEndpoint jmsEndpoint;
65  	protected String durableSubscriptionName;
66  	protected String clientID;
67  	protected String messageSelector;
68  	protected Hermes hermes;
69  	protected List<RequestFilter> filters = new ArrayList<RequestFilter>();
70  
71  	public void abortRequest( SubmitContext submitContext )
72  	{
73  	}
74  
75  	public void addRequestFilter( RequestFilter filter )
76  	{
77  		filters.add( filter );
78  	}
79  
80  	public void removeRequestFilter( RequestFilter filter )
81  	{
82  		filters.remove( filter );
83  	}
84  
85  	public Response sendRequest( SubmitContext submitContext, Request request ) throws Exception
86  	{
87  		long timeStarted = Calendar.getInstance().getTimeInMillis();
88  		submitContext.setProperty( JMS_RECEIVE_TIMEOUT, getTimeout( submitContext, request ) );
89  
90  		return resolveType( submitContext, request ).execute( submitContext, request, timeStarted );
91  	}
92  
93  	protected void init( SubmitContext submitContext, Request request ) throws NamingException
94  	{
95  		this.jmsEndpoint = new JMSEndpoint( request, submitContext );
96  		this.hermes = getHermes( jmsEndpoint.getSessionName(), request );
97  		this.username = submitContext.expand( request.getUsername() );
98  		this.password = submitContext.expand( request.getPassword() );
99  		JMSHeaderConfig jmsConfig = ( ( AbstractHttpRequest<?> )request ).getJMSHeaderConfig();
100    	this.durableSubscriptionName = submitContext.expand(jmsConfig.getDurableSubscriptionName());
101    	this.clientID = submitContext.expand(jmsConfig.getClientID());
102    	this.messageSelector = submitContext.expand(jmsConfig.getMessageSelector());
103 			
104 
105 	}
106 
107 	protected Response execute( SubmitContext submitContext, Request request, long timeStarted ) throws Exception
108 	{
109 		throw new NotImplementedException();
110 	}
111 
112 	private HermesJmsRequestTransport resolveType( SubmitContext submitContext, Request request )
113 			throws CannotResolveJmsTypeException, MissingTransportException
114 	{
115 		int ix = request.getEndpoint().indexOf( "://" );
116 		if( ix == -1 )
117 			throw new MissingTransportException( "Missing protocol in endpoint [" + request.getEndpoint() + "]" );
118 
119 		String[] params = JMSEndpoint.extractEndpointParameters( request );
120 
121 		// resolve sending class
122 		if( params.length == 2 )
123 		{
124 			String destinationName = PropertyExpander.expandProperties( submitContext, params[1] );
125 			if( destinationName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX ) )
126 			{
127 				return new HermesJmsRequestSendTransport();
128 			}
129 			else if( destinationName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX ) )
130 			{
131 				return new HermesJmsRequestPublishTransport();
132 			}
133 			else
134 			{
135 				cannotResolve();
136 			}
137 
138 		}
139 		// resolve receiving class
140 		else if( params.length == 3 && PropertyExpander.expandProperties( submitContext, params[1] ).equals( "-" ) )
141 		{
142 			String destinationName = PropertyExpander.expandProperties( submitContext, params[2] );
143 			if( destinationName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX ) )
144 			{
145 				return new HermesJmsRequestReceiveTransport();
146 			}
147 			else if( destinationName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX ) )
148 			{
149 				return new HermesJmsRequestSubscribeTransport();
150 			}
151 			else
152 			{
153 				cannotResolve();
154 			}
155 		}
156 		// resolve send-receive class
157 		else if( params.length == 3 )
158 		{
159 			String destinationSendName = PropertyExpander.expandProperties( submitContext, params[1] );
160 			String destinationReceiveName = PropertyExpander.expandProperties( submitContext, params[2] );
161 			if( destinationSendName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX )
162 					&& destinationReceiveName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX ) )
163 			{
164 				return new HermesJmsRequestSendReceiveTransport();
165 			}
166 			else if( destinationSendName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX )
167 					&& destinationReceiveName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX ) )
168 			{
169 				return new HermesJmsRequestSendSubscribeTransport();
170 			}
171 			else if( destinationSendName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX )
172 					&& destinationReceiveName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX ) )
173 			{
174 				return new HermesJmsRequestPublishSubscribeTransport();
175 			}
176 			else if( destinationSendName.startsWith( JMSEndpoint.TOPIC_ENDPOINT_PREFIX )
177 					&& destinationReceiveName.startsWith( JMSEndpoint.QUEUE_ENDPOINT_PREFIX ) )
178 			{
179 				return new HermesJmsRequestPublishReceiveTransport();
180 			}
181 			else
182 			{
183 				cannotResolve();
184 			}
185 		}
186 		else
187 		{
188 			cannotResolve();
189 		}
190 		return null;
191 	}
192 
193 	private static void cannotResolve() throws CannotResolveJmsTypeException
194 	{
195 		throw new CannotResolveJmsTypeException(
196 				"\nBad jms alias! \nFor JMS please use this endpont pattern:\nfor sending 'jms://sessionName/queue_myqueuename' \nfor receive  'jms://sessionName/-/queue_myqueuename'\nfor send-receive 'jms://sessionName/queue_myqueuename1/queue_myqueuename2'" );
197 	}
198 
199 	protected Hermes getHermes( String sessionName, Request request ) throws NamingException
200 	{
201 		WsdlProject project = ( WsdlProject )ModelSupport.getModelItemProject( request );
202 		return HermesUtils.getHermes( project, sessionName );
203 	}
204 
205 	protected long getTimeout( SubmitContext submitContext, Request request )
206 	{
207 		String timeout = PropertyExpander.expandProperties( submitContext, request.getTimeout() );
208 		long to = 0;
209 		try
210 		{
211 			to = Long.parseLong( timeout );
212 		}
213 		catch( Exception e )
214 		{
215 		}
216 
217 		return to;
218 	}
219 
220 	protected JMSHeader createJMSHeader( SubmitContext submitContext, Request request, Hermes hermes, Message message )
221 	{
222 		JMSHeader jmsHeader = new JMSHeader();
223 		jmsHeader.setMessageHeaders( message, request, hermes, submitContext );
224 		JMSHeader.setMessageProperties( message, request, hermes, submitContext );
225 		return jmsHeader;
226 	}
227 
228 	protected void closeSessionAndConnection( Connection connection, Session session ) throws JMSException
229 	{
230 		if( session != null )
231 			session.close();
232 		if( connection != null )
233 			connection.close();
234 	}
235 
236 	protected Response errorResponse( SubmitContext submitContext, Request request, long timeStarted, JMSException jmse )
237 	{
238 		JMSResponse response;
239 		SoapUI.logError( jmse );
240 		submitContext.setProperty( JMS_ERROR, jmse );
241 		response = new JMSResponse( "", null, null, request, timeStarted );
242 		submitContext.setProperty( JMS_RESPONSE, response );
243 		return response;
244 	}
245 
246 	protected Message messageSend( SubmitContext submitContext, Request request, Session session, Hermes hermes,
247 			Queue queueSend ) throws JMSException
248 	{
249 		MessageProducer messageProducer = session.createProducer( queueSend );
250 		Message messageSend = createMessage( submitContext, request, session );
251 		return send( submitContext, request, hermes, messageProducer, messageSend );
252 	}
253 
254 	protected Message messagePublish( SubmitContext submitContext, Request request, Session topicSession,
255 			Hermes hermes, Topic topicPublish ) throws JMSException
256 	{
257 		 MessageProducer topicPublisher = topicSession.createProducer(  topicPublish );
258 		Message messagePublish = createMessage( submitContext, request, topicSession );
259 		return send( submitContext, request, hermes, topicPublisher, messagePublish );
260 	}
261 
262 	private Message send( SubmitContext submitContext, Request request, Hermes hermes, MessageProducer messageProducer,
263 			Message message ) throws JMSException
264 	{
265 		JMSHeader jmsHeader = createJMSHeader( submitContext, request, hermes, message );
266 		messageProducer.send( message, message.getJMSDeliveryMode(), message.getJMSPriority(), jmsHeader.getTimeTolive() );
267 		submitContext.setProperty( JMS_MESSAGE_SEND, message );
268 		return message;
269 	}
270 
271 	protected Response makeResponse( SubmitContext submitContext, Request request, long timeStarted,
272 			Message messageSend, MessageConsumer messageConsumer ) throws JMSException
273 	{
274 		long timeout = getTimeout( submitContext, request );
275 		Message messageReceive = messageConsumer.receive( timeout );
276 		if( messageReceive != null )
277 		{
278 			JMSResponse response = resolveMessage( request, timeStarted, messageSend, messageReceive );
279 			submitContext.setProperty( JMS_MESSAGE_RECEIVE, messageReceive );
280 			submitContext.setProperty( JMS_RESPONSE, response );
281 			return response;
282 		}
283 		else
284 		{
285 			return new JMSResponse( "", null, null, request, timeStarted );
286 		}
287 	}
288 
289 	private JMSResponse resolveMessage( Request request, long timeStarted, Message messageSend, Message messageReceive )
290 			throws JMSException
291 	{
292 		if( messageReceive instanceof TextMessage )
293 		{
294 			TextMessage textMessageReceive = ( TextMessage )messageReceive;
295 			return new JMSResponse( textMessageReceive.getText(), messageSend, textMessageReceive, request, timeStarted );
296 		}
297 		else if( messageReceive instanceof MapMessage )
298 		{
299 			MapMessage mapMessageReceive = ( MapMessage )messageReceive;
300 			return new JMSResponse( JMSUtils.extractMapMessagePayloadToString( mapMessageReceive ), messageSend,
301 					mapMessageReceive, request, timeStarted );
302 		}
303 		else if( messageReceive instanceof BytesMessage )
304 		{
305 			BytesMessage bytesMessageReceive = ( BytesMessage )messageReceive;
306 			JMSResponse jmsResponse = new JMSResponse( "", messageSend, bytesMessageReceive, request, timeStarted );
307 			addAttachment( request, bytesMessageReceive, jmsResponse );
308 			return jmsResponse;
309 		}
310 		return null;
311 	}
312 
313 	protected Response makeEmptyResponse( SubmitContext submitContext, Request request, long timeStarted,
314 			Message messageSend )
315 	{
316 		JMSResponse response = new JMSResponse( "", messageSend, null, request, timeStarted );
317 		submitContext.setProperty( JMS_RESPONSE, response );
318 		return response;
319 	}
320 
321 	private Message createMessage( SubmitContext submitContext, Request request, Session session ) throws JMSException
322 	{
323 		if( request instanceof WsdlRequest || request instanceof HttpTestRequest || request instanceof RestRequest )
324 		{
325 			if( hasAttachment( request ) )
326 			{
327 				if( isTextAttachment( request ) )
328 				{
329 					return createTextMessageFromAttachment( submitContext, request, session );
330 				}
331 				else
332 				{
333 					return createBytesMessage( request, session );
334 				}
335 			}
336 			else
337 			{
338 				return createTextMessage( submitContext, request, session );
339 			}
340 		}
341 
342 		return null;
343 	}
344 
345 	private Message createTextMessageFromAttachment( SubmitContext submitContext, Request request, Session session )
346 	{
347 		try
348 		{
349 			String content = convertStreamToString( request.getAttachments()[0].getInputStream() );
350 			TextMessage textMessageSend = session.createTextMessage();
351 			String messageBody = PropertyExpander.expandProperties( submitContext, content );
352 			textMessageSend.setText( messageBody );
353 			return textMessageSend;
354 		}
355 		catch( Exception e )
356 		{
357 			SoapUI.logError( e );
358 		}
359 		return null;
360 	}
361 
362 	private String convertStreamToString( InputStream is )
363 	{
364 		BufferedReader reader = new BufferedReader( new InputStreamReader( is ) );
365 		StringBuilder sb = new StringBuilder();
366 
367 		String line = null;
368 		try
369 		{
370 			while( ( line = reader.readLine() ) != null )
371 			{
372 				sb.append( line + "\n" );
373 			}
374 		}
375 		catch( IOException e )
376 		{
377 			e.printStackTrace();
378 		}
379 		finally
380 		{
381 			try
382 			{
383 				is.close();
384 			}
385 			catch( IOException e )
386 			{
387 				e.printStackTrace();
388 			}
389 		}
390 		return sb.toString();
391 	}
392 
393 	private boolean hasAttachment( Request request )
394 	{
395 		if( request.getAttachments().length > 0 )
396 			return true;
397 		return false;
398 	}
399 
400 	private Message createTextMessage( SubmitContext submitContext, Request request, Session session )
401 			throws JMSException
402 	{
403 		TextMessage textMessageSend = session.createTextMessage();
404 		String messageBody = PropertyExpander.expandProperties( submitContext, request.getRequestContent() );
405 		textMessageSend.setText( messageBody );
406 		return textMessageSend;
407 	}
408 
409 	private boolean isTextAttachment( Request request )
410 	{
411 		if( request.getAttachments().length > 0
412 				&& ( request.getAttachments()[0].getContentType().contains( "/text" )
413 						|| request.getAttachments()[0].getContentType().contains( "/xml" ) || request.getAttachments()[0]
414 						.getContentType().contains( "text/plain" ) ) )
415 		{
416 			return true;
417 		}
418 		return false;
419 	}
420 
421 	private Message createBytesMessage( Request request, Session session )
422 	{
423 		try
424 		{
425 			InputStream in = request.getAttachments()[0].getInputStream();
426 			int buff = -1;
427 			ByteArrayOutputStream baos = new ByteArrayOutputStream();
428 			while( ( buff = in.read() ) != -1 )
429 			{
430 				baos.write( buff );
431 			}
432 			BytesMessage bytesMessage = session.createBytesMessage();
433 			bytesMessage.writeBytes( baos.toByteArray() );
434 			return bytesMessage;
435 		}
436 		catch( Exception e )
437 		{
438 			SoapUI.logError( e );
439 		}
440 		return null;
441 	}
442 
443 	private void addAttachment( Request request, BytesMessage bytesMessageReceive, JMSResponse jmsResponse )
444 			throws JMSException
445 	{
446 		try
447 		{
448 			byte[] buff = new byte[1];
449 			File temp = File.createTempFile( "bytesmessage", ".tmp" );
450 			OutputStream out = new FileOutputStream( temp );
451 			while( bytesMessageReceive.readBytes( buff ) != -1 )
452 			{
453 				out.write( buff );
454 			}
455 			out.close();
456 			Attachment[] attachments = new Attachment[] { new RequestFileAttachment( temp, false,
457 					( AbstractHttpRequest<?> )request ) };
458 			jmsResponse.setAttachments( attachments );
459 		}
460 		catch( IOException e )
461 		{
462 			SoapUI.logError( e );
463 		}
464 	}
465 
466 //	protected Connection createConnection( SubmitContext submitContext, Request request,
467 //			ConnectionFactory connectionFactory, Domain domain, String clientId ) throws JMSException
468 //	{
469 //		QueueConnection queueConnection;
470 //		TopicConnection topicConnection;
471 //
472 //		String username = submitContext.expand( request.getUsername() );
473 //		String password = submitContext.expand( request.getPassword() );
474 //
475 //		if( domain.equals( Domain.TOPIC ) )
476 //		{
477 //			topicConnection = StringUtils.hasContent( username ) ? ( ( TopicConnectionFactory )connectionFactory )
478 //					.createTopicConnection( username, password ) : ( ( TopicConnectionFactory )connectionFactory )
479 //					.createTopicConnection();
480 //
481 //			if( !StringUtils.isNullOrEmpty( clientId ) )
482 //				topicConnection.setClientID( clientId );
483 //
484 //			return topicConnection;
485 //		}
486 //		else if( domain.equals( Domain.QUEUE ) )
487 //		{
488 //			queueConnection = StringUtils.hasContent( username ) ? ( ( QueueConnectionFactory )connectionFactory )
489 //					.createQueueConnection( username, password ) : ( ( QueueConnectionFactory )connectionFactory )
490 //					.createQueueConnection();
491 //
492 //			if( !StringUtils.isNullOrEmpty( clientId ) )
493 //				queueConnection.setClientID( clientId );
494 //
495 //			return queueConnection;
496 //		}
497 //		else
498 //		{
499 //			return null;
500 //		}
501 //	}
502 
503 	@SuppressWarnings( "serial" )
504 	public static class UnresolvedJMSEndpointException extends Exception
505 	{
506 		public UnresolvedJMSEndpointException( String msg )
507 		{
508 			super( msg );
509 		}
510 	}
511 
512 }