Assuming you have your source and target databases, you can start by setting up the service broker. You'll have to familiarise yourself with it, so you can set up the queues and contracts etc - there's too much to write here.
Then you need triggers for insert, update and delete that capture the transaction data, create an xml message, and post the message on the queue. The easiest way is to create a stored proc that takes the xml as input and creates a message from it. The trigger can generate the xml easily enough along the lines of:
SET @xml = (
SELECT *
-- use 'deleted' for deletes
FROM inserted AS
FOR
XML AUTO ,ROOT('root')
)
Your target database then needs a master stored proc that consumes the messages (i.e. it is automatically fired whenever a message gets posted to the queue). This proc should look at the xml, decide which table is affected, then run a stored proc specific to that table for consuming the transaction data.
It all seems like a lot of work, but it's scriptable! Below is a script I wrote for doing replication via the service broker. DISCLAIMER NOTE - it requires the source and target databases to have identical schemas, and sets up the objects based on the common tables. The databases both need to be on the same instance. If you want to use different instances you'll have to adapt the script. Hopefully it'll give you all the logic you need to get started though.
USE master
GO
DECLARE @LiveDb sysname = N''
,@RptDb sysname = N''
-- set up the service broker on Live first
DECLARE
@sbguidsrc UNIQUEIDENTIFIER
,@sbguidtgt UNIQUEIDENTIFIER
,@sbflag BIT
,@sql NVARCHAR(MAX)
SELECT
@sbguidsrc = service_broker_guid
,@sbflag = is_broker_enabled
FROM
sys.databases
WHERE
[name] = @LiveDb
SET @sql = N'
ALTER DATABASE '+@LiveDb+N' SET TRUSTWORTHY ON WITH ROLLBACK IMMEDIATE'
EXEC sp_executesql @sql
IF @sbflag = 0
BEGIN
SET @sql = N'
ALTER DATABASE '+@LiveDb+N' SET NEW_BROKER WITH ROLLBACK IMMEDIATE'
EXEC sp_executesql @sql
SELECT
@sbguidsrc = service_broker_guid
,@sbflag = is_broker_enabled
FROM
sys.databases
WHERE
[name] = @LiveDb
IF @sbflag = 0
RAISERROR('Cannot enable broker on %d', 16, 1, @LiveDb) WITH nowait
END
-- set up the service broker on Reporting db with (almost) the same details
SELECT
@sbguidtgt = service_broker_guid
,@sbflag = is_broker_enabled
FROM
sys.databases
WHERE
[name] = @RptDb
SET @sql = N'
ALTER DATABASE '+@RptDb+N' SET TRUSTWORTHY ON WITH ROLLBACK IMMEDIATE'
EXEC sp_executesql @sql
IF @sbflag = 0
BEGIN
SET @sql = N'
ALTER DATABASE '+@RptDb+N' SET NEW_BROKER WITH ROLLBACK IMMEDIATE'
EXEC sp_executesql @sql
SELECT
@sbguidtgt = service_broker_guid
,@sbflag = is_broker_enabled
FROM
sys.databases
WHERE
[name] = @RptDb
IF @sbflag = 0
RAISERROR('Cannot enable broker on %d', 16, 1, @RptDb) WITH nowait
END
-- now create service broker objects on both databases
SET @sql = N'
USE '+@LiveDb+N'
IF EXISTS ( SELECT
*
FROM
sys.routes
WHERE
name = N''RT_ReportingReplication_'+@LiveDb+N''' )
DROP ROUTE [RT_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.services
WHERE
name = N''SVC_ReportingReplication_'+@LiveDb+N''' )
DROP SERVICE [SVC_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_queues
WHERE
name = N''Q_ReportingReplication_'+@LiveDb+N''' )
DROP QUEUE [dbo].[Q_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_contracts
WHERE
name = N''CTRCT_ReportingReplication_'+@LiveDb+N''' )
DROP CONTRACT [CTRCT_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_message_types
WHERE
name = N''MSG_ReportingReplication_'+@LiveDb+N''' )
DROP MESSAGE TYPE [MSG_ReportingReplication_'+@LiveDb+N']
CREATE MESSAGE TYPE MSG_ReportingReplication_'+@LiveDb+N'
CREATE CONTRACT CTRCT_ReportingReplication_'+@LiveDb+N'
(MSG_ReportingReplication_'+@LiveDb+N' SENT BY ANY)
CREATE QUEUE Q_ReportingReplication_'+@LiveDb+N' WITH STATUS = ON ;
CREATE SERVICE SVC_ReportingReplication_'+@LiveDb+N'
ON QUEUE Q_ReportingReplication_'+@LiveDb+N' (CTRCT_ReportingReplication_'+@LiveDb+N')
CREATE ROUTE RT_ReportingReplication_'+@LiveDb+N'
WITH SERVICE_NAME=''SVC_ReportingReplication_'+@LiveDb+N'''
,BROKER_INSTANCE=''' + CAST(@sbguidtgt AS NVARCHAR(36)) + '''
,ADDRESS=''LOCAL'''
--PRINT @sql
EXEC sp_executesql @sql
SET @sql = N'
USE '+@RptDb+N'
IF EXISTS ( SELECT
*
FROM
sys.routes
WHERE
name = N''RT_ReportingReplication_'+@RptDb+N''' )
DROP ROUTE [RT_ReportingReplication_'+@RptDb+N']
IF EXISTS ( SELECT
*
FROM
sys.services
WHERE
name = N''SVC_ReportingReplication_'+@RptDb+N''' )
DROP SERVICE [SVC_ReportingReplication_'+@RptDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_queues
WHERE
name = N''Q_ReportingReplication_'+@RptDb+N''' )
DROP QUEUE [dbo].[Q_ReportingReplication_'+@RptDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_contracts
WHERE
name = N''CTRCT_ReportingReplication_'+@LiveDb+N''' )
DROP CONTRACT [CTRCT_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_message_types
WHERE
name = N''MSG_ReportingReplication_'+@LiveDb+N''' )
DROP MESSAGE TYPE [MSG_ReportingReplication_'+@LiveDb+N']
CREATE MESSAGE TYPE MSG_ReportingReplication_'+@LiveDb+N'
CREATE CONTRACT CTRCT_ReportingReplication_'+@LiveDb+N' -- keep same name as on Live database
(MSG_ReportingReplication_'+@LiveDb+N' SENT BY ANY)
CREATE QUEUE Q_ReportingReplication_'+@RptDb+N' WITH STATUS = ON ;
CREATE SERVICE SVC_ReportingReplication_'+@RptDb+N'
ON QUEUE Q_ReportingReplication_'+@RptDb+N' (CTRCT_ReportingReplication_'+@LiveDb+N')
CREATE ROUTE RT_ReportingReplication_'+@RptDb+N'
WITH SERVICE_NAME=''SVC_ReportingReplication_'+@RptDb+N'''
,BROKER_INSTANCE=''' + CAST(@sbguidsrc AS NVARCHAR(36)) + '''
,ADDRESS=''LOCAL'''
--PRINT @sql
EXEC sp_executesql @sql
-- now create the stored proc in Live db to send the data to the reporting database
SET @sql = N'
USE '+@LiveDb+N'
IF EXISTS ( SELECT
*
FROM
sys.objects
WHERE
name = ''procSendReplicationData''
AND type = ''P'' )
BEGIN
DROP PROCEDURE [procSendReplicationData]
END
'
EXEC sp_executesql @sql
SET @sql = N'
EXEC '+@LiveDb+N'.sys.sp_executesql N''
CREATE PROCEDURE procSendReplicationData (@xmlData XML)
AS
BEGIN
DECLARE @handle UNIQUEIDENTIFIER
BEGIN DIALOG CONVERSATION @handle
FROM SERVICE SVC_ReportingReplication_'+@LiveDb+N'
TO SERVICE ''''SVC_ReportingReplication_'+@RptDb+N'''''
ON CONTRACT CTRCT_ReportingReplication_'+@LiveDb+N'
WITH ENCRYPTION=OFF ;
SEND ON CONVERSATION @handle
MESSAGE TYPE MSG_ReportingReplication_'+@LiveDb+N' (@xmlData)
-- debug: remove next line
-- INSERT INTO '+@RptDb+N'.dbo.ErrorLog
-- (code, message)
-- VALUES
-- (0, -- code - int
-- CAST(@xmlData AS NVARCHAR(MAX)) -- message - nvarchar(max)
-- )
--SELECT @xmlData
END'''
EXEC sp_executesql @sql
-- create a trigger on each table for which data is to be replicated. Inserts and Updates should
-- be kept separate to deletes so extra triggers may be needed if deletions can occur.
-- NB: we only add a trigger to the tables existing in both databases
DECLARE @table sysname
,@tid INT
,@updList NVARCHAR(MAX)
,@fromList NVARCHAR(MAX)
,@insList NVARCHAR(MAX)
,@pkJoin NVARCHAR(MAX)
,@spMastsql NVARCHAR(MAX)
,@isIdent BIT
SET @spMastsql = N'
USE '+@RptDb+N'
IF EXISTS ( SELECT
*
FROM
sys.objects
WHERE
name = ''procApplyTransactions_Master''
AND type = ''P'' )
BEGIN
DROP PROCEDURE [procApplyTransactions_Master]
END'
EXEC sp_executesql @spMastsql
SET @spMastsql = N'
EXEC '+@RptDb+N'.sys.sp_executesql N''
CREATE PROCEDURE procApplyTransactions_Master
AS
BEGIN
DECLARE
@xml XML
,@handle UNIQUEIDENTIFIER
,@hDoc INT
WHILE 1 = 1
BEGIN
SELECT
@xml = NULL
,@handle = NULL
WAITFOR (RECEIVE TOP (1) @handle=conversation_handle, @xml=CAST(message_body AS XML)
FROM Q_ReportingReplication_'+@RptDb+N'), TIMEOUT 10000
IF @handle IS NULL
BREAK
EXEC sp_xml_preparedocument
@hDoc OUTPUT
,@xml
'
SET @sql = N'
DECLARE trigTables CURSOR FOR
SELECT l.name, l.object_id
FROM '+@LiveDb+N'.sys.tables AS l
INNER JOIN '+@RptDb+N'.sys.tables AS r
ON l.name = r.name
AND SCHEMA_NAME(l.schema_id) = SCHEMA_NAME(r.schema_id)
WHERE l.type = ''U''
AND r.type = ''U'''
EXEC sp_executesql @sql
OPEN trigTables
FETCH NEXT FROM trigTables
INTO @table, @tid
WHILE @@FETCH_STATUS = 0
BEGIN
-- get primary key info
SET @sql = N'
SET @pk = NULL;
SELECT @pk = ISNULL(@pk+N''AND '',N'''')+N''a.''+QUOTENAME(c.name)+N'' = '+@table+N'.''+QUOTENAME(c.name)
FROM '+@LiveDb+N'.sys.key_constraints AS kc
INNER JOIN '+@LiveDb+N'.sys.index_columns AS ic
ON kc.parent_object_id = ic.object_id
AND kc.unique_index_id = ic.index_id
INNER JOIN '+@LiveDb+N'.sys.columns AS c
ON ic.object_id = c.object_id
AND ic.column_id = c.column_id
WHERE kc.parent_object_id = @tableID
AND kc.type = ''PK'''
EXEC sp_executesql @stmt = @sql
,@parameters = N'@pk NVARCHAR(MAX) OUTPUT, @tableID INT'
,@pk = @pkJoin OUTPUT
,@tableID = @tid
-- generate the column lists needed for the replication procs
SET @sql = N'
SET @upd = NULL;
SELECT @upd = ISNULL(@upd+N''
,'',N'''')+QUOTENAME(c.name)+N'' = a.''+QUOTENAME(c.name)
FROM '+@LiveDb+N'.sys.columns AS c
INNER JOIN '+@LiveDb+N'.sys.types AS t
ON c.user_type_id = t.user_type_id
AND c.system_type_id = t.system_type_id
WHERE c.object_id = @tableID
AND t.name != N''timestamp''
AND CHARINDEX(QUOTENAME(c.name),@pk) = 0
ORDER BY c.column_id'
EXEC sp_executesql @stmt = @sql
,@parameters = N'@pk NVARCHAR(MAX), @tableID INT, @upd NVARCHAR(MAX) OUTPUT'
,@pk = @pkJoin
,@tableID = @tid
,@upd = @updList OUTPUT
SET @sql = N'
SET @ins = NULL;
SET @from = NULL;
SELECT @ins = ISNULL(@ins+N''
,'',N'''')+QUOTENAME(c.name)
,@from = ISNULL(@from+N''
,'',N'''')+QUOTENAME(c.name)+N'' ''+t.name
+CASE WHEN t.name IN (N''CHAR'',N''VARCHAR'',N''NCHAR'',N''NVARCHAR'',N''BINARY'',N''VARBINARY'')
THEN N''(''+CASE WHEN c.max_length = -1 THEN N''MAX'' ELSE CAST(c.max_length AS NVARCHAR) END+N'')''
WHEN t.name = N''DECIMAL'' THEN N''(''+CAST(c.precision AS NVARCHAR)+N'',''+CAST(c.scale AS NVARCHAR)+N'')''
ELSE N'''' END
+N'' ''''''''@''+c.name+N''''''''''''
FROM '+@LiveDb+N'.sys.columns AS c
INNER JOIN '+@LiveDb+N'.sys.types AS t
ON c.user_type_id = t.user_type_id
AND c.system_type_id = t.system_type_id
WHERE c.object_id = @tableID
AND t.name != N''timestamp''
ORDER BY c.column_id'
EXEC sp_executesql @stmt = @sql
,@parameters = N'@tableID INT, @ins NVARCHAR(MAX) OUTPUT, @from NVARCHAR(MAX) OUTPUT'
,@tableID = @tid
,@ins = @insList OUTPUT
,@from = @fromList OUTPUT
-- find out if there are any identity columns
SET @sql = N'
SELECT @ident = CAST(MAX(CAST(is_identity AS INT)) AS BIT)
FROM '+@LiveDb+N'.sys.columns
WHERE object_id = @tableID'
EXEC sp_executesql @stmt = @sql
,@parameters = N'@tableID INT, @ident BIT OUTPUT'
,@tableID = @tid
,@ident = @isIdent OUTPUT
-- build the sql needed to create the trigger and proc for each table
SET @sql = N'
USE '+@LiveDb+N'
IF EXISTS ( SELECT
*
FROM
sys.objects
WHERE
name = ''TR_'+@table+N'_UI''
AND type = ''TR'' )
BEGIN
DROP TRIGGER [TR_'+@table+N'_UI]
END'
EXEC sp_executesql @sql
SET @sql = N'
EXEC '+@LiveDb+N'.sys.sp_executesql N''
CREATE TRIGGER TR_'+@table+N'_UI ON dbo.'+@table+N'
FOR INSERT, UPDATE
AS
BEGIN
DECLARE @xml XML
SET @xml = (
SELECT
*
FROM
inserted AS '+@table+N'
FOR
XML AUTO
,ROOT(''''root'''')
)
-- debug: remove next line
--SELECT @xml
IF @xml IS NOT NULL
EXEC procSendReplicationData
@xml
END'''
EXEC sp_executesql @sql
SET @sql = N'
USE '+@RptDb+N'
IF EXISTS ( SELECT
*
FROM
sys.objects
WHERE
name = ''procApplyTransactions_'+@table+N'''
AND type = ''P'' )
BEGIN
DROP PROCEDURE [procApplyTransactions_'+@table+N']
END'
EXEC sp_executesql @sql
SET @sql = N'
EXEC '+@RptDb+N'.sys.sp_executesql N''
CREATE PROCEDURE procApplyTransactions_'+@table+N' (@xml XML)
AS
BEGIN
DECLARE @hDoc INT
EXEC sp_xml_preparedocument
@hDoc OUTPUT
,@xml
UPDATE
'+@table+N'
SET
'+@updList+N'
FROM
OPENXML (@hDoc, ''''/root/'+@table+N''''')
WITH ('+@fromList+N') a
WHERE
'+@pkJoin+N'
'+CASE WHEN @isIdent = 1 THEN N'SET IDENTITY_INSERT '+@table+N' ON' ELSE N'' END+N'
INSERT INTO '+@table+N'
('+@insList+N')
SELECT
'+@insList+N'
FROM
OPENXML (@hDoc, ''''/root/'+@table+N''''')
WITH ('+@fromList+N') a
WHERE
NOT EXISTS ( SELECT
*
FROM
'+@table+N'
WHERE
'+@pkJoin+N' )
'+CASE WHEN @isIdent = 1 THEN N'SET IDENTITY_INSERT '+@table+N' OFF' ELSE N'' END+N'
EXEC sp_xml_removedocument
@hDoc
END'''
--print @sql
EXEC sp_executesql @sql
-- add to master proc
SET @spMastsql = @spMastsql + N'
IF EXISTS ( SELECT
*
FROM
OPENXML(@hDoc, ''''/root/'+@table+N''''') )
BEGIN
EXEC procApplyTransactions_'+@table+N'
@xml
END
ELSE '
FETCH NEXT FROM trigTables
INTO @table, @tid
END
CLOSE trigTables
DEALLOCATE trigTables
SET @spMastsql = LEFT(@spMastsql,LEN(@spMastsql)-4) + N'
EXEC sp_xml_removedocument
@hDoc
END CONVERSATION @handle
END
END'''
--select @spMastsql
EXEC sp_executesql @spMastsql
-- create proc to deal with messages on source queue
SET @sql = N'
USE '+@LiveDb+N'
IF EXISTS ( SELECT *
FROM sys.objects
WHERE name = ''procReceiveBrokerResponse''
AND type = ''P''
AND SCHEMA_NAME(schema_id) = ''dbo'' )
BEGIN
DROP PROCEDURE dbo.procReceiveBrokerResponse
END
'
EXEC sp_executesql @sql
SET @sql = N'
EXEC '+@LiveDb+N'.sys.sp_executesql N''
CREATE PROCEDURE dbo.procReceiveBrokerResponse
AS
DECLARE
@dh UNIQUEIDENTIFIER
,@message_type SYSNAME
,@message_body NVARCHAR(4000)
BEGIN TRANSACTION
WAITFOR (
RECEIVE TOP(1) @dh = [conversation_handle],
@message_type = [message_type_name],
@message_body = CAST([message_body] AS NVARCHAR(4000))
FROM Q_ReportingReplication_'+@LiveDb+N'), TIMEOUT 1000;
WHILE @dh IS NOT NULL
BEGIN
IF @message_type = N''''http://schemas.microsoft.com/SQL/ServiceBroker/Error''''
BEGIN
RAISERROR (N''''Received error %s from service [Target]'''', 10, 1, @message_body) WITH LOG
END
END CONVERSATION @dh
COMMIT
SELECT
@dh = NULL
BEGIN TRANSACTION
WAITFOR (
RECEIVE TOP(1) @dh = [conversation_handle],
@message_type = [message_type_name],
@message_body = CAST([message_body] AS NVARCHAR(4000))
FROM Q_ReportingReplication_'+@LiveDb+N'), TIMEOUT 1000;
END
COMMIT''
'
EXEC sp_executesql @sql
-- alter source queue to process responses
SET @sql = N'
USE '+@LiveDb+N'
ALTER QUEUE Q_ReportingReplication_'+@LiveDb+N'
WITH STATUS = ON
,ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 1,
PROCEDURE_NAME = procReceiveBrokerResponse,
EXECUTE AS OWNER) ;'
EXEC sp_executesql @sql
-- finally alter the destination queue to activate the receiving proc when a message is received
SET @sql = N'
USE '+@RptDb+N'
ALTER QUEUE Q_ReportingReplication_'+@RptDb+N'
WITH STATUS = ON
,ACTIVATION (PROCEDURE_NAME = dbo.procApplyTransactions_Master
,MAX_QUEUE_READERS = 1
,EXECUTE AS OWNER
,STATUS = ON)'
EXEC sp_executesql @sql
.
social.technet.microsoft.com/Forums
No comments:
Post a Comment