----------------- Service Broker Objects -------------------------------- CREATE MESSAGE TYPE OrderRequest VALIDATION = WELL_FORMED_XML CREATE MESSAGE TYPE OrderResponse VALIDATION = WELL_FORMED_XML go CREATE CONTRACT OrderContract (OrderRequest SENT BY INITIATOR, OrderResponse SENT BY TARGET) go CREATE QUEUE OrderRequests WITH STATUS = OFF CREATE QUEUE OrderResponses WITH STATUS = ON go CREATE SERVICE OrderRequests ON QUEUE OrderRequests (OrderContract) CREATE SERVICE OrderResponses ON QUEUE OrderResponses (OrderContract) go --------------------- SendAndReceiveOrders ------------------------------------ go -- SendAndReceiveOrders serves as our token external system. It reads all orders -- in OrderImportFileItems and send every order in a message of its own. This is -- only a help procedure for the demo, and is itself not to meant to illustrate -- any best practice or similar. CREATE PROCEDURE SendAndReceiveOrders @WaitTime int AS SET XACT_ABORT, NOCOUNT ON BEGIN TRY DECLARE @FileID int, @RowNo int, @handle uniqueidentifier, @binary varbinary(MAX), @xml xml -- This table is used by FileID/RowNo to conversation handles. DECLARE @FileConversationMap TABLE (FileID int NOT NULL, RowNo int NOT NULL, handle uniqueidentifier NOT NULL UNIQUE, PRIMARY KEY(FileID, RowNo) ) DECLARE cur CURSOR STATIC LOCAL FOR SELECT FileID, RowNo, cast(Data AS varbinary(MAX)) FROM OrderImportFileItems ORDER BY FileID, RowNo OPEN cur WHILE 1 = 1 BEGIN FETCH cur INTO @FileID, @RowNo, @binary IF @@fetch_status <> 0 BREAK BEGIN TRANSACTION BEGIN DIALOG CONVERSATION @handle FROM SERVICE OrderResponses TO SERVICE 'OrderRequests' ON CONTRACT OrderContract WITH ENCRYPTION = OFF ;SEND ON CONVERSATION @handle MESSAGE TYPE OrderRequest (@binary) INSERT @FileConversationMap(FileID, RowNo, handle) VALUES(@FileID, @RowNo, @handle) COMMIT TRANSACTION END DEALLOCATE cur -- All orders sent. Wait for the responses. WHILE 1 = 1 BEGIN WAITFOR (RECEIVE TOP (1) @handle = conversation_handle, @xml = CAST(message_body AS xml) FROM OrderResponses), TIMEOUT @WaitTime IF @@rowcount = 0 BREAK ;WITH shredded_xml AS ( SELECT T.c.value('@OrderID', 'uniqueidentifier') AS OrderID, T.c.value('@Status', 'char(1)') AS Status, T.c.value('@Message', 'nvarchar(2048)') AS Message, T.c.value('@InternalOrderID', 'int') AS InternalOrderID FROM @xml.nodes('OrderResponse') AS T(c) ) UPDATE OrderImportFileItems SET Status = sx.Status, Message = sx.Message, InternalOrderID = sx.InternalOrderID FROM OrderImportFileItems FI JOIN @FileConversationMap FCM ON FCM.FileID = FI.FileID AND FCM.RowNo = FI.RowNo CROSS JOIN shredded_xml sx WHERE FCM.handle = @handle END CONVERSATION @handle END END TRY BEGIN CATCH IF @@trancount > 0 ROLLBACK TRANSACTION EXEC slog.catchhandler_sp @@procid RETURN 1 END CATCH go -- The activation procedures includes a WAITFOR statement which only is there to -- encourage SQL Server to add more queue readers. The more queue readers the -- queue is set up for, the longer each one waits. CREATE FUNCTION get_waittime () RETURNS datetime AS BEGIN DECLARE @max_readers int SELECT @max_readers = max_readers FROM sys.service_queues WHERE name = 'OrderRequests' RETURN (SELECT dateadd(ms, 50 * (@max_readers - 1), '00:00:00')) END go --=============================================================================== --------------------------- DoOneOrderFromActivation ---------------------------- -- This procedure processes a single order, received in the variable @data. It is -- called from the activation procedures. (Of which there are three different ones.) -- The procedure is drawn from the body of the WHILE loop ProcessOneImportFile, -- but there are some differences, due to the context. CREATE PROCEDURE DoOneOrderFromActivation @data xml, @OrderGuidStr nvarchar(400), @OrderID int OUTPUT AS SET XACT_ABORT, NOCOUNT ON BEGIN TRY DECLARE @Operation varchar(100), @OrderGuid uniqueidentifier, @CustomerIDStr nvarchar(400), @CustomerID int, @CampaignCode nvarchar(100), @DetailRows xml, @ProductIDStr nvarchar(400), @ProductID int, @QuantityStr nvarchar(400), @Quantity int, @PriceStr nvarchar(400), @Price decimal(10, 2) DECLARE @Products TABLE (ProductID int NOT NULL) -- Since this procedure is called from activation, there should be a transaction -- that includes the RECEIVE statement. IF @@trancount = 0 BEGIN EXEC slog.catchhandler_sp @@procid, 'This procedure should be called from a transaction' RETURN 55555 END -- Init output parameter. SELECT @OrderID = NULL -- Get data from the Order level. SELECT @Operation = upper(T.c.value('@Operation', 'varchar(100)')), @CustomerIDStr = nullif(T.c.value('@CustomerID', 'nvarchar(400)'), ''), @CampaignCode = ltrim(T.c.value('@CampaignCode', 'nvarchar(100)')), @DetailRows = T.c.query('Products') FROM @data.nodes('Order') AS T(c) -- Attempt to convert the GUID to the correct data type. We must have ROLLBACK -- in the CATCH block, since the transaction is doomed. BEGIN TRY SELECT @OrderGuid = convert(uniqueidentifier, @OrderGuidStr) END TRY BEGIN CATCH ROLLBACK TRANSACTION END CATCH -- Something went wrong, or the order GUID was missing in the first place. IF @OrderGuid IS NULL BEGIN EXEC slog.sqleventlog_sp @@procid, 'Invalid or missing GUID "%1" for order.', @p1 = @OrderGuidStr END -- Attempt to get the customer ID, and raise an error if it fails. Note that NULL -- CustomerID is permitted for some values on Operation. BEGIN TRY SELECT @CustomerID = convert(int, @CustomerIDStr) END TRY BEGIN CATCH ROLLBACK TRANSACTION EXEC slog.sqleventlog_sp @@procid, 'Invalid customer id = "%1".', @p1 = @CustomerIDStr END CATCH -- A token validation of the campaign code. IF len(@CampaignCode) > 6 BEGIN EXEC slog.sqleventlog_sp @@procid, 'Incorrectly formed campaign code.', @p1 = @CampaignCode END -- First handle the header information. For a new order, there is extra -- validation. IF @Operation = 'ADD' BEGIN IF @CustomerID IS NULL BEGIN EXEC slog.sqleventlog_sp @@procid, 'Customer not provided for new order.' END IF NOT EXISTS (SELECT * FROM @DetailRows.nodes('/Products/Product') AS T(c)) BEGIN EXEC slog.sqleventlog_sp @@procid, 'For a new order, there must be at least one product.' END EXEC AddExternalOrder @OrderGuid, @CustomerID, @CampaignCode, @OrderID OUTPUT END ELSE IF @Operation = 'MODIFY' EXEC ModifyExternalOrder @OrderGuid, @CustomerID, @CampaignCode ELSE IF @Operation = 'DELETE' EXEC DeleteExternalOrder @OrderGuid ELSE BEGIN EXEC slog.sqleventlog_sp @@procid, 'Invalid operation %1 for order %2.', @p1 = @Operation, @p2 = @OrderGuid END -- Now loop over the products. (Which is not a very good solution for this -- simple example, but imagine that ModifyExternalOrderDetail is a very complex -- stored procedure.) Again, all values are extracted to strings. DECLARE DetailsCur CURSOR STATIC LOCAL FOR SELECT ProductIDStr = nullif(T.c.value('@ProductID', 'nvarchar(400)'), ''), QuantityStr = nullif(T.c.value('@Quantity', 'nvarchar(400)'), ''), PriceStr = nullif(T.c.value('@Price', 'nvarchar(400)'), '') FROM @DetailRows.nodes('/Products/Product') AS T(c) ORDER BY QuantityStr DESC OPEN DetailsCur WHILE 1 = 1 BEGIN FETCH DetailsCur INTO @ProductIDStr, @QuantityStr, @PriceStr IF @@fetch_status <> 0 BREAK SELECT @ProductID = NULL, @Quantity = NULL, @Price = NULL -- Get numeric values. As in ProcessOneImportFile, there is no TRY-CATCH. -- (On SQL 2012 and later, you would use try_convert() instead.) SELECT @ProductID = convert(int, @ProductIDStr), @Quantity = convert(int, @QuantityStr), @Price = convert(decimal(10, 2), @PriceStr) IF @ProductID IS NULL BEGIN EXEC slog.sqleventlog_sp @@procid, 'Illegal product ID "%1" on detail entry.', @p1 = @ProductIDStr END IF EXISTS (SELECT * FROM @Products WHERE ProductID = @ProductID) BEGIN EXEC slog.sqleventlog_sp @@procid, 'Product %1 appears multiple times for the same order entry.', @p1 = @ProductID END INSERT @Products(ProductID) VALUES(@ProductID) IF @Operation = 'ADD' AND @Quantity = 0 EXEC slog.sqleventlog_sp @@procid, 'Quantity must be > 0 when adding a new order.' EXEC ModifyExternalOrderDetail @OrderGuid, @ProductID, @Quantity = @Quantity, @OrderPrice = @Price END DEALLOCATE DetailsCur END TRY BEGIN CATCH -- The CATCH handler here is a standard one; the activation procedure has -- the fancy stuff. IF @@trancount > 0 ROLLBACK TRANSACTION EXEC slog.catchhandler_sp @@procid RETURN 55555 END CATCH go --================================================================================ -- There are three different activation procedures, demonstrating three ways to -- handle errors when processing a message. -- In ProcessOrders1 we simply use a variable that we set in the error handler, and -- which we look at when we receive the message. This is only good for a single -- queue reader. CREATE PROCEDURE ProcessOrders1 AS SET XACT_ABORT, NOCOUNT ON DECLARE @DialogHandle uniqueidentifier, @MessageType sysname, @binarydata varbinary(MAX), @xmldata xml, @Response xml, @OrderGuidStr nvarchar(400), @OrderID int, @ErrorMessage nvarchar(2048), @ErrorNumber int WHILE 1 = 1 BEGIN TRY -- Init variables, but not @Response. SELECT @DialogHandle = NULL, @MessageType = NULL, @binarydata = NULL, @xmldata = NULL, @OrderGuidStr = NULL, @OrderID = NULL, @ErrorMessage = NULL, @ErrorNumber = NULL -- This is nothing you would or should put in a normal activation procedure, -- but it is here to provoke SQL Server to add more queue readers. DECLARE @WaitTime datetime SELECT @WaitTime = dbo.get_waittime() WAITFOR DELAY @WaitTime BEGIN TRANSACTION -- Get next message of the queue. WAITFOR ( RECEIVE TOP (1) @DialogHandle = conversation_handle, @MessageType = message_type_name, @binarydata = message_body FROM OrderRequests ), TIMEOUT 1000 -- If the queue is exhausted, return. IF @DialogHandle IS NULL BEGIN ROLLBACK TRANSACTION RETURN END -- This is somewhat rough: if the message type is anything but OrderRequest, -- we call it an error. Normally you would check for the predefined message -- types for error and closed conversation here. IF coalesce(@MessageType, '') <> 'OrderRequest' BEGIN EXEC slog.sqleventlog_sp @@procid, 'Unexpected message type "%1".', @p1 = @MessageType, @raiserror = 0 END CONVERSATION @DialogHandle COMMIT TRANSACTION CONTINUE END -- Unless there is a response canned, attempt to process this order. IF @Response IS NULL -- Process this order. Trap any error locally. BEGIN TRY -- Convert data to XML. SELECT @xmldata = convert(xml, @binarydata) -- Extract the OrderGuid here and now. SELECT @OrderGuidStr = @xmldata.value('Order[1]/@OrderID', 'nvarchar(400)') EXEC DoOneOrderFromActivation @xmldata, @OrderGuidStr, @OrderID OUTPUT -- Procedure completed normally, thus send success response. SELECT @Response = (SELECT 'O' AS [@Status], @OrderGuidStr AS [@OrderID], @OrderID AS [@InternalOrderID] FOR XML PATH('OrderResponse'), TYPE) END TRY BEGIN CATCH -- CATCH handler, make sure that the transaction is rolled back. IF @@trancount > 0 ROLLBACK TRANSACTION -- Log error, but do not reraise. Capture error message and error number. EXEC slog.catchhandler_sp @@procid, @reraise = 0, @errno = @ErrorNumber OUTPUT, @errmsg_aug = @ErrorMessage OUTPUT -- Unless it was a deadlock, form an error response. IF @ErrorNumber <> 1205 BEGIN SELECT @Response = (SELECT 'E' AS [@Status], @ErrorMessage AS [@Message], @OrderGuidStr AS [@OrderID] FOR XML PATH('OrderResponse'), TYPE) END END CATCH -- If we still have a transaction at this point, send response back to -- the initiator and clear it. IF @@trancount > 0 BEGIN ; SEND ON CONVERSATION @DialogHandle MESSAGE TYPE OrderResponse (@Response) SELECT @Response = NULL END CONVERSATION @DialogHandle COMMIT TRANSACTION END END TRY BEGIN CATCH -- If some error happens outside the processing, for instance with RECEIVE -- because the queue is disabled, get out and log error here. IF @@trancount > 0 ROLLBACK TRANSACTION EXEC slog.catchhandler_sp @@procid RETURN 1 END CATCH go ----------------------------- ProcessOrders2 ------------------------------------- -- Instead of using a lone scalar variable, we have a table variable where we track -- the conversation handle and the error message. Now we can handle that a message -- with error does not come back directly, which could happen if we use priorities. -- This works better with multiple queue readers, but it is still not a good solution. CREATE PROCEDURE ProcessOrders2 AS SET XACT_ABORT, NOCOUNT ON DECLARE @DialogHandle uniqueidentifier, @MsgSeqNo bigint, @MessageType sysname, @binarydata varbinary(MAX), @xmldata xml, @Response xml, @OrderGuidStr nvarchar(400), @OrderID int, @ErrorMessage nvarchar(2048), @ErrorNumber int -- The table where we log bad messages. We use IGNORE_DUP_KEY, so that we can -- make our error handler simpler. (And error handlers should be simple!) DECLARE @BadMessages TABLE (DialogHandle uniqueidentifier NOT NULL, MsgSeqNo bigint NOT NULL, Response xml NULL, PRIMARY KEY (DialogHandle, MsgSeqNo) WITH (IGNORE_DUP_KEY = ON) ) WHILE 1 = 1 BEGIN TRY -- Clear variables, including @Response. SELECT @DialogHandle = NULL, @MsgSeqNo = NULL, @MessageType = NULL, @binarydata = NULL, @xmldata = NULL, @Response = NULL, @OrderGuidStr = NULL, @OrderID = NULL, @ErrorMessage = NULL, @ErrorNumber = NULL -- Again, a delay to get more queue readers. DECLARE @WaitTime datetime SELECT @WaitTime = dbo.get_waittime() WAITFOR DELAY @WaitTime BEGIN TRANSACTION -- Get the message. Note that we also get the message sequence number to get -- a unique key for the message. The dialog handle alone is not enough. WAITFOR ( RECEIVE TOP (1) @DialogHandle = conversation_handle, @MsgSeqNo = message_sequence_number, @MessageType = message_type_name, @binarydata = message_body FROM OrderRequests ), TIMEOUT 1000 IF @DialogHandle IS NULL BEGIN ROLLBACK TRANSACTION RETURN END IF coalesce(@MessageType, '') <> 'OrderRequest' BEGIN EXEC slog.sqleventlog_sp @@procid, 'Unexpected message type "%1".', @p1 = @MessageType, @raiserror = 0 END CONVERSATION @DialogHandle COMMIT TRANSACTION CONTINUE END -- Any canned response in the error table? SELECT @Response = Response FROM @BadMessages WHERE DialogHandle = @DialogHandle AND MsgSeqNo = @MsgSeqNo IF @Response IS NULL -- If not, try processing the message. BEGIN TRY -- Convert data to XML. SELECT @xmldata = convert(xml, @binarydata) -- Extract the OrderGuid here and now. SELECT @OrderGuidStr = @xmldata.value('Order[1]/@OrderID', 'nvarchar(400)') EXEC DoOneOrderFromActivation @xmldata, @OrderGuidStr, @OrderID OUTPUT SELECT @Response = (SELECT 'O' AS [@Status], @OrderGuidStr AS [@OrderID], @OrderID AS [@InternalOrderID] FOR XML PATH('OrderResponse'), TYPE) END TRY BEGIN CATCH IF @@trancount > 0 ROLLBACK TRANSACTION EXEC slog.catchhandler_sp @@procid, @reraise = 0, @errno = @ErrorNumber OUTPUT, @errmsg_aug = @ErrorMessage OUTPUT -- Store message in error table, unless error was deadlock. IF @ErrorNumber <> 1205 BEGIN INSERT @BadMessages(DialogHandle, MsgSeqNo, Response) SELECT @DialogHandle, @MsgSeqNo, (SELECT 'E' AS [@Status], @ErrorMessage AS [@Message], @OrderGuidStr AS [@OrderID] FOR XML PATH('OrderResponse'), TYPE) END END CATCH -- Send response if there still is an active transaction. IF @@trancount > 0 BEGIN ; SEND ON CONVERSATION @DialogHandle MESSAGE TYPE OrderResponse (@Response) END CONVERSATION @DialogHandle COMMIT TRANSACTION END END TRY BEGIN CATCH IF @@trancount > 0 ROLLBACK TRANSACTION EXEC slog.catchhandler_sp @@procid RETURN 1 END CATCH go -------------------- --------- ProcessOrders3 ------------ ------------------------ -- ProcessOrders3 is the same as ProcessOrders2, except that it uses a permanent -- table instead of a table variable. Therefore it works with multiple queue -- readers - but it is not water-proof! CREATE TABLE BadMessages (DialogHandle uniqueidentifier NOT NULL, MsgSeqNo bigint NOT NULL, Response xml NULL, PRIMARY KEY (DialogHandle, MsgSeqNo) WITH (IGNORE_DUP_KEY = ON) ) go CREATE PROCEDURE ProcessOrders3 AS SET XACT_ABORT, NOCOUNT ON DECLARE @DialogHandle uniqueidentifier, @MsgSeqNo bigint, @MessageType sysname, @binarydata varbinary(MAX), @xmldata xml, @Response xml, @OrderGuidStr nvarchar(400), @OrderID int, @ErrorMessage nvarchar(2048), @ErrorNumber int WHILE 1 = 1 BEGIN TRY SELECT @DialogHandle = NULL, @MsgSeqNo = NULL, @MessageType = NULL, @binarydata = NULL, @xmldata = NULL, @Response = NULL, @OrderGuidStr = NULL, @OrderID = NULL, @ErrorMessage = NULL, @ErrorNumber = NULL DECLARE @WaitTime datetime SELECT @WaitTime = dbo.get_waittime() WAITFOR DELAY @WaitTime BEGIN TRANSACTION WAITFOR ( RECEIVE TOP (1) @DialogHandle = conversation_handle, @MsgSeqNo = message_sequence_number, @MessageType = message_type_name, @binarydata = message_body FROM OrderRequests ), TIMEOUT 1000 IF @DialogHandle IS NULL BEGIN COMMIT TRANSACTION RETURN END IF coalesce(@MessageType, '') <> 'OrderRequest' BEGIN EXEC slog.sqleventlog_sp @@procid, 'Unexpected message type "%1".', @p1 = @MessageType, @raiserror = 0 END CONVERSATION @DialogHandle COMMIT TRANSACTION CONTINUE END SELECT @Response = Response FROM BadMessages WHERE DialogHandle = @DialogHandle AND MsgSeqNo = @MsgSeqNo IF @Response IS NULL BEGIN TRY SELECT @xmldata = convert(xml, @binarydata) SELECT @OrderGuidStr = @xmldata.value('Order[1]/@OrderID', 'nvarchar(400)') EXEC DoOneOrderFromActivation @xmldata, @OrderGuidStr, @OrderID OUTPUT SELECT @Response = (SELECT 'O' AS [@Status], @OrderGuidStr AS [@OrderID], @OrderID AS [@InternalOrderID] FOR XML PATH('OrderResponse'), TYPE) END TRY BEGIN CATCH IF @@trancount > 0 ROLLBACK TRANSACTION EXEC slog.catchhandler_sp @@procid, @reraise = 0, @errno = @ErrorNumber OUTPUT, @errmsg_aug = @ErrorMessage OUTPUT -- Write message to error table. But note that since we have rolled back, -- another process may already have started processing the message. Which -- may be acceptable - it's unlikely that this would happen five times. IF @ErrorNumber <> 1205 BEGIN INSERT BadMessages(DialogHandle, MsgSeqNo, Response) SELECT @DialogHandle, @MsgSeqNo, (SELECT 'E' AS [@Status], @ErrorMessage AS [@Message], @OrderGuidStr AS [@OrderID] FOR XML PATH('OrderResponse'), TYPE) END END CATCH IF @@trancount > 0 BEGIN ; SEND ON CONVERSATION @DialogHandle MESSAGE TYPE OrderResponse (@Response) END CONVERSATION @DialogHandle COMMIT TRANSACTION END END TRY BEGIN CATCH IF @@trancount > 0 ROLLBACK TRANSACTION EXEC slog.catchhandler_sp @@procid RETURN 1 END CATCH go --=============================================================================== -- Testing things out! Use this ALTER QUEUE statement to select which activation -- procedure to use, and how many queue readers you want. (The actual number will -- be lower, but the higher number you choose, the longer the waiting time, and -- SQL Server will create more readers.) ALTER QUEUE OrderRequests WITH STATUS = ON, ACTIVATION (STATUS = ON, PROCEDURE_NAME = ProcessOrders1, MAX_QUEUE_READERS = 1, EXECUTE AS OWNER) go -- Create and receive orders. The parameter to ReceiveOrderResponses is for how -- long to wait to for the last response. If you see orders that are unprocessed, -- but no error about a queue being disabled you need increase the timeout. DECLARE @logid int SELECT @logid = coalesce(MAX(logid), 0) FROM slog.sqleventlog EXEC SendAndReceiveOrders 5000 SELECT * FROM OrderImportFileItems ORDER BY FileID, RowNo SELECT * FROM slog.sqleventlog WHERE logid > @logid ORDER BY logid go ----------------------------------- CLEANUP! ------------------------------------- DECLARE @sql nvarchar(MAX) SELECT @sql = (SELECT DISTINCT 'END CONVERSATION ' + quotename(convert(char(36), conversation_handle), '''') + ' WITH CLEANUP;' FROM sys.conversation_endpoints FOR XML PATH(''), TYPE).value('.', 'nvarchar(MAX)') EXEC (@sql) go DROP TABLE BadMessages DROP SERVICE OrderRequests DROP SERVICE OrderResponses DROP QUEUE OrderRequests DROP QUEUE OrderResponses DROP FUNCTION get_waittime DROP PROCEDURE DoOneOrderFromActivation DROP PROCEDURE ProcessOrders1 DROP PROCEDURE ProcessOrders2 DROP PROCEDURE ProcessOrders3 DROP PROCEDURE SendAndReceiveOrders DROP CONTRACT OrderContract DROP MESSAGE TYPE OrderRequest DROP MESSAGE TYPE OrderResponse go