Showing posts with label Need help with Change Data Capture. Show all posts
Showing posts with label Need help with Change Data Capture. Show all posts

Tuesday, May 14, 2013

[SQL Server Data Warehousing] Need help with Change Data Capture


Assuming you have your source and target databases, you can start by setting up the service broker.  You'll have to familiarise yourself with it, so you can set up the queues and contracts etc - there's too much to write here.


Then you need triggers for insert, update and delete that capture the transaction data, create an xml message, and post the message on the queue.  The easiest way is to create a stored proc that takes the xml as input and creates a message from it.  The trigger can generate the xml easily enough along the lines of:



SET @xml = (
SELECT *
-- use 'deleted' for deletes
FROM inserted AS
FOR
XML AUTO ,ROOT('root')
)

Your target database then needs a master stored proc that consumes the messages (i.e. it is automatically fired whenever a message gets posted to the queue).  This proc should look at the xml, decide which table is affected, then run a stored proc specific to that table for consuming the transaction data.


It all seems like a lot of work, but it's scriptable!  Below is a script I wrote for doing replication via the service broker.  DISCLAIMER NOTE - it requires the source and target databases to have identical schemas, and sets up the objects based on the common tables.  The databases both need to be on the same instance.  If you want to use different instances you'll have to adapt the script.  Hopefully it'll give you all the logic you need to get started though.



USE master
GO
DECLARE @LiveDb sysname = N''
,@RptDb sysname = N''

-- set up the service broker on Live first
DECLARE
@sbguidsrc UNIQUEIDENTIFIER
,@sbguidtgt UNIQUEIDENTIFIER
,@sbflag BIT
,@sql NVARCHAR(MAX)

SELECT
@sbguidsrc = service_broker_guid
,@sbflag = is_broker_enabled
FROM
sys.databases
WHERE
[name] = @LiveDb


SET @sql = N'
ALTER DATABASE '+@LiveDb+N' SET TRUSTWORTHY ON WITH ROLLBACK IMMEDIATE'

EXEC sp_executesql @sql

IF @sbflag = 0
BEGIN
SET @sql = N'
ALTER DATABASE '+@LiveDb+N' SET NEW_BROKER WITH ROLLBACK IMMEDIATE'

EXEC sp_executesql @sql

SELECT
@sbguidsrc = service_broker_guid
,@sbflag = is_broker_enabled
FROM
sys.databases
WHERE
[name] = @LiveDb

IF @sbflag = 0
RAISERROR('Cannot enable broker on %d', 16, 1, @LiveDb) WITH nowait
END

-- set up the service broker on Reporting db with (almost) the same details
SELECT
@sbguidtgt = service_broker_guid
,@sbflag = is_broker_enabled
FROM
sys.databases
WHERE
[name] = @RptDb

SET @sql = N'
ALTER DATABASE '+@RptDb+N' SET TRUSTWORTHY ON WITH ROLLBACK IMMEDIATE'

EXEC sp_executesql @sql

IF @sbflag = 0
BEGIN
SET @sql = N'
ALTER DATABASE '+@RptDb+N' SET NEW_BROKER WITH ROLLBACK IMMEDIATE'

EXEC sp_executesql @sql

SELECT
@sbguidtgt = service_broker_guid
,@sbflag = is_broker_enabled
FROM
sys.databases
WHERE
[name] = @RptDb

IF @sbflag = 0
RAISERROR('Cannot enable broker on %d', 16, 1, @RptDb) WITH nowait
END

-- now create service broker objects on both databases
SET @sql = N'
USE '+@LiveDb+N'

IF EXISTS ( SELECT
*
FROM
sys.routes
WHERE
name = N''RT_ReportingReplication_'+@LiveDb+N''' )
DROP ROUTE [RT_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.services
WHERE
name = N''SVC_ReportingReplication_'+@LiveDb+N''' )
DROP SERVICE [SVC_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_queues
WHERE
name = N''Q_ReportingReplication_'+@LiveDb+N''' )
DROP QUEUE [dbo].[Q_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_contracts
WHERE
name = N''CTRCT_ReportingReplication_'+@LiveDb+N''' )
DROP CONTRACT [CTRCT_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_message_types
WHERE
name = N''MSG_ReportingReplication_'+@LiveDb+N''' )
DROP MESSAGE TYPE [MSG_ReportingReplication_'+@LiveDb+N']

CREATE MESSAGE TYPE MSG_ReportingReplication_'+@LiveDb+N'

CREATE CONTRACT CTRCT_ReportingReplication_'+@LiveDb+N'
(MSG_ReportingReplication_'+@LiveDb+N' SENT BY ANY)

CREATE QUEUE Q_ReportingReplication_'+@LiveDb+N' WITH STATUS = ON ;

CREATE SERVICE SVC_ReportingReplication_'+@LiveDb+N'
ON QUEUE Q_ReportingReplication_'+@LiveDb+N' (CTRCT_ReportingReplication_'+@LiveDb+N')

CREATE ROUTE RT_ReportingReplication_'+@LiveDb+N'
WITH SERVICE_NAME=''SVC_ReportingReplication_'+@LiveDb+N'''
,BROKER_INSTANCE=''' + CAST(@sbguidtgt AS NVARCHAR(36)) + '''
,ADDRESS=''LOCAL'''
--PRINT @sql
EXEC sp_executesql @sql

SET @sql = N'
USE '+@RptDb+N'

IF EXISTS ( SELECT
*
FROM
sys.routes
WHERE
name = N''RT_ReportingReplication_'+@RptDb+N''' )
DROP ROUTE [RT_ReportingReplication_'+@RptDb+N']
IF EXISTS ( SELECT
*
FROM
sys.services
WHERE
name = N''SVC_ReportingReplication_'+@RptDb+N''' )
DROP SERVICE [SVC_ReportingReplication_'+@RptDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_queues
WHERE
name = N''Q_ReportingReplication_'+@RptDb+N''' )
DROP QUEUE [dbo].[Q_ReportingReplication_'+@RptDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_contracts
WHERE
name = N''CTRCT_ReportingReplication_'+@LiveDb+N''' )
DROP CONTRACT [CTRCT_ReportingReplication_'+@LiveDb+N']
IF EXISTS ( SELECT
*
FROM
sys.service_message_types
WHERE
name = N''MSG_ReportingReplication_'+@LiveDb+N''' )
DROP MESSAGE TYPE [MSG_ReportingReplication_'+@LiveDb+N']

CREATE MESSAGE TYPE MSG_ReportingReplication_'+@LiveDb+N'

CREATE CONTRACT CTRCT_ReportingReplication_'+@LiveDb+N' -- keep same name as on Live database
(MSG_ReportingReplication_'+@LiveDb+N' SENT BY ANY)

CREATE QUEUE Q_ReportingReplication_'+@RptDb+N' WITH STATUS = ON ;

CREATE SERVICE SVC_ReportingReplication_'+@RptDb+N'
ON QUEUE Q_ReportingReplication_'+@RptDb+N' (CTRCT_ReportingReplication_'+@LiveDb+N')

CREATE ROUTE RT_ReportingReplication_'+@RptDb+N'
WITH SERVICE_NAME=''SVC_ReportingReplication_'+@RptDb+N'''
,BROKER_INSTANCE=''' + CAST(@sbguidsrc AS NVARCHAR(36)) + '''
,ADDRESS=''LOCAL'''
--PRINT @sql
EXEC sp_executesql @sql

-- now create the stored proc in Live db to send the data to the reporting database
SET @sql = N'
USE '+@LiveDb+N'

IF EXISTS ( SELECT
*
FROM
sys.objects
WHERE
name = ''procSendReplicationData''
AND type = ''P'' )
BEGIN
DROP PROCEDURE [procSendReplicationData]
END
'

EXEC sp_executesql @sql

SET @sql = N'
EXEC '+@LiveDb+N'.sys.sp_executesql N''
CREATE PROCEDURE procSendReplicationData (@xmlData XML)
AS
BEGIN
DECLARE @handle UNIQUEIDENTIFIER

BEGIN DIALOG CONVERSATION @handle
FROM SERVICE SVC_ReportingReplication_'+@LiveDb+N'
TO SERVICE ''''SVC_ReportingReplication_'+@RptDb+N'''''
ON CONTRACT CTRCT_ReportingReplication_'+@LiveDb+N'
WITH ENCRYPTION=OFF ;

SEND ON CONVERSATION @handle
MESSAGE TYPE MSG_ReportingReplication_'+@LiveDb+N' (@xmlData)

-- debug: remove next line
-- INSERT INTO '+@RptDb+N'.dbo.ErrorLog
-- (code, message)
-- VALUES
-- (0, -- code - int
-- CAST(@xmlData AS NVARCHAR(MAX)) -- message - nvarchar(max)
-- )
--SELECT @xmlData
END'''

EXEC sp_executesql @sql

-- create a trigger on each table for which data is to be replicated. Inserts and Updates should
-- be kept separate to deletes so extra triggers may be needed if deletions can occur.

-- NB: we only add a trigger to the tables existing in both databases
DECLARE @table sysname
,@tid INT
,@updList NVARCHAR(MAX)
,@fromList NVARCHAR(MAX)
,@insList NVARCHAR(MAX)
,@pkJoin NVARCHAR(MAX)
,@spMastsql NVARCHAR(MAX)
,@isIdent BIT


SET @spMastsql = N'
USE '+@RptDb+N'
IF EXISTS ( SELECT
*
FROM
sys.objects
WHERE
name = ''procApplyTransactions_Master''
AND type = ''P'' )
BEGIN
DROP PROCEDURE [procApplyTransactions_Master]
END'

EXEC sp_executesql @spMastsql

SET @spMastsql = N'
EXEC '+@RptDb+N'.sys.sp_executesql N''
CREATE PROCEDURE procApplyTransactions_Master
AS
BEGIN
DECLARE
@xml XML
,@handle UNIQUEIDENTIFIER
,@hDoc INT

WHILE 1 = 1
BEGIN
SELECT
@xml = NULL
,@handle = NULL

WAITFOR (RECEIVE TOP (1) @handle=conversation_handle, @xml=CAST(message_body AS XML)
FROM Q_ReportingReplication_'+@RptDb+N'), TIMEOUT 10000

IF @handle IS NULL
BREAK

EXEC sp_xml_preparedocument
@hDoc OUTPUT
,@xml

'

SET @sql = N'
DECLARE trigTables CURSOR FOR
SELECT l.name, l.object_id
FROM '+@LiveDb+N'.sys.tables AS l
INNER JOIN '+@RptDb+N'.sys.tables AS r
ON l.name = r.name
AND SCHEMA_NAME(l.schema_id) = SCHEMA_NAME(r.schema_id)
WHERE l.type = ''U''
AND r.type = ''U'''

EXEC sp_executesql @sql

OPEN trigTables

FETCH NEXT FROM trigTables
INTO @table, @tid

WHILE @@FETCH_STATUS = 0
BEGIN
-- get primary key info
SET @sql = N'
SET @pk = NULL;
SELECT @pk = ISNULL(@pk+N''AND '',N'''')+N''a.''+QUOTENAME(c.name)+N'' = '+@table+N'.''+QUOTENAME(c.name)
FROM '+@LiveDb+N'.sys.key_constraints AS kc
INNER JOIN '+@LiveDb+N'.sys.index_columns AS ic
ON kc.parent_object_id = ic.object_id
AND kc.unique_index_id = ic.index_id
INNER JOIN '+@LiveDb+N'.sys.columns AS c
ON ic.object_id = c.object_id
AND ic.column_id = c.column_id
WHERE kc.parent_object_id = @tableID
AND kc.type = ''PK'''

EXEC sp_executesql @stmt = @sql
,@parameters = N'@pk NVARCHAR(MAX) OUTPUT, @tableID INT'
,@pk = @pkJoin OUTPUT
,@tableID = @tid

-- generate the column lists needed for the replication procs
SET @sql = N'
SET @upd = NULL;
SELECT @upd = ISNULL(@upd+N''
,'',N'''')+QUOTENAME(c.name)+N'' = a.''+QUOTENAME(c.name)
FROM '+@LiveDb+N'.sys.columns AS c
INNER JOIN '+@LiveDb+N'.sys.types AS t
ON c.user_type_id = t.user_type_id
AND c.system_type_id = t.system_type_id
WHERE c.object_id = @tableID
AND t.name != N''timestamp''
AND CHARINDEX(QUOTENAME(c.name),@pk) = 0
ORDER BY c.column_id'

EXEC sp_executesql @stmt = @sql
,@parameters = N'@pk NVARCHAR(MAX), @tableID INT, @upd NVARCHAR(MAX) OUTPUT'
,@pk = @pkJoin
,@tableID = @tid
,@upd = @updList OUTPUT

SET @sql = N'
SET @ins = NULL;
SET @from = NULL;
SELECT @ins = ISNULL(@ins+N''
,'',N'''')+QUOTENAME(c.name)
,@from = ISNULL(@from+N''
,'',N'''')+QUOTENAME(c.name)+N'' ''+t.name
+CASE WHEN t.name IN (N''CHAR'',N''VARCHAR'',N''NCHAR'',N''NVARCHAR'',N''BINARY'',N''VARBINARY'')
THEN N''(''+CASE WHEN c.max_length = -1 THEN N''MAX'' ELSE CAST(c.max_length AS NVARCHAR) END+N'')''
WHEN t.name = N''DECIMAL'' THEN N''(''+CAST(c.precision AS NVARCHAR)+N'',''+CAST(c.scale AS NVARCHAR)+N'')''
ELSE N'''' END
+N'' ''''''''@''+c.name+N''''''''''''
FROM '+@LiveDb+N'.sys.columns AS c
INNER JOIN '+@LiveDb+N'.sys.types AS t
ON c.user_type_id = t.user_type_id
AND c.system_type_id = t.system_type_id
WHERE c.object_id = @tableID
AND t.name != N''timestamp''
ORDER BY c.column_id'

EXEC sp_executesql @stmt = @sql
,@parameters = N'@tableID INT, @ins NVARCHAR(MAX) OUTPUT, @from NVARCHAR(MAX) OUTPUT'
,@tableID = @tid
,@ins = @insList OUTPUT
,@from = @fromList OUTPUT

-- find out if there are any identity columns
SET @sql = N'
SELECT @ident = CAST(MAX(CAST(is_identity AS INT)) AS BIT)
FROM '+@LiveDb+N'.sys.columns
WHERE object_id = @tableID'

EXEC sp_executesql @stmt = @sql
,@parameters = N'@tableID INT, @ident BIT OUTPUT'
,@tableID = @tid
,@ident = @isIdent OUTPUT

-- build the sql needed to create the trigger and proc for each table
SET @sql = N'
USE '+@LiveDb+N'
IF EXISTS ( SELECT
*
FROM
sys.objects
WHERE
name = ''TR_'+@table+N'_UI''
AND type = ''TR'' )
BEGIN
DROP TRIGGER [TR_'+@table+N'_UI]
END'

EXEC sp_executesql @sql

SET @sql = N'
EXEC '+@LiveDb+N'.sys.sp_executesql N''
CREATE TRIGGER TR_'+@table+N'_UI ON dbo.'+@table+N'
FOR INSERT, UPDATE
AS
BEGIN
DECLARE @xml XML

SET @xml = (
SELECT
*
FROM
inserted AS '+@table+N'
FOR
XML AUTO
,ROOT(''''root'''')
)
-- debug: remove next line
--SELECT @xml
IF @xml IS NOT NULL
EXEC procSendReplicationData
@xml
END'''

EXEC sp_executesql @sql

SET @sql = N'
USE '+@RptDb+N'
IF EXISTS ( SELECT
*
FROM
sys.objects
WHERE
name = ''procApplyTransactions_'+@table+N'''
AND type = ''P'' )
BEGIN
DROP PROCEDURE [procApplyTransactions_'+@table+N']
END'

EXEC sp_executesql @sql

SET @sql = N'
EXEC '+@RptDb+N'.sys.sp_executesql N''
CREATE PROCEDURE procApplyTransactions_'+@table+N' (@xml XML)
AS
BEGIN
DECLARE @hDoc INT

EXEC sp_xml_preparedocument
@hDoc OUTPUT
,@xml

UPDATE
'+@table+N'
SET
'+@updList+N'
FROM
OPENXML (@hDoc, ''''/root/'+@table+N''''')
WITH ('+@fromList+N') a
WHERE
'+@pkJoin+N'

'+CASE WHEN @isIdent = 1 THEN N'SET IDENTITY_INSERT '+@table+N' ON' ELSE N'' END+N'
INSERT INTO '+@table+N'
('+@insList+N')
SELECT
'+@insList+N'
FROM
OPENXML (@hDoc, ''''/root/'+@table+N''''')
WITH ('+@fromList+N') a
WHERE
NOT EXISTS ( SELECT
*
FROM
'+@table+N'
WHERE
'+@pkJoin+N' )
'+CASE WHEN @isIdent = 1 THEN N'SET IDENTITY_INSERT '+@table+N' OFF' ELSE N'' END+N'

EXEC sp_xml_removedocument
@hDoc
END'''

--print @sql
EXEC sp_executesql @sql

-- add to master proc
SET @spMastsql = @spMastsql + N'
IF EXISTS ( SELECT
*
FROM
OPENXML(@hDoc, ''''/root/'+@table+N''''') )
BEGIN
EXEC procApplyTransactions_'+@table+N'
@xml
END
ELSE '

FETCH NEXT FROM trigTables
INTO @table, @tid
END

CLOSE trigTables
DEALLOCATE trigTables

SET @spMastsql = LEFT(@spMastsql,LEN(@spMastsql)-4) + N'

EXEC sp_xml_removedocument
@hDoc

END CONVERSATION @handle
END
END'''

--select @spMastsql
EXEC sp_executesql @spMastsql

-- create proc to deal with messages on source queue
SET @sql = N'
USE '+@LiveDb+N'

IF EXISTS ( SELECT *
FROM sys.objects
WHERE name = ''procReceiveBrokerResponse''
AND type = ''P''
AND SCHEMA_NAME(schema_id) = ''dbo'' )
BEGIN
DROP PROCEDURE dbo.procReceiveBrokerResponse
END
'

EXEC sp_executesql @sql

SET @sql = N'
EXEC '+@LiveDb+N'.sys.sp_executesql N''
CREATE PROCEDURE dbo.procReceiveBrokerResponse
AS
DECLARE
@dh UNIQUEIDENTIFIER
,@message_type SYSNAME
,@message_body NVARCHAR(4000)

BEGIN TRANSACTION

WAITFOR (

RECEIVE TOP(1) @dh = [conversation_handle],
@message_type = [message_type_name],
@message_body = CAST([message_body] AS NVARCHAR(4000))
FROM Q_ReportingReplication_'+@LiveDb+N'), TIMEOUT 1000;

WHILE @dh IS NOT NULL
BEGIN
IF @message_type = N''''http://schemas.microsoft.com/SQL/ServiceBroker/Error''''
BEGIN

RAISERROR (N''''Received error %s from service [Target]'''', 10, 1, @message_body) WITH LOG

END

END CONVERSATION @dh

COMMIT

SELECT
@dh = NULL

BEGIN TRANSACTION

WAITFOR (

RECEIVE TOP(1) @dh = [conversation_handle],
@message_type = [message_type_name],
@message_body = CAST([message_body] AS NVARCHAR(4000))
FROM Q_ReportingReplication_'+@LiveDb+N'), TIMEOUT 1000;

END

COMMIT''
'

EXEC sp_executesql @sql

-- alter source queue to process responses
SET @sql = N'
USE '+@LiveDb+N'

ALTER QUEUE Q_ReportingReplication_'+@LiveDb+N'
WITH STATUS = ON
,ACTIVATION (
STATUS = ON,
MAX_QUEUE_READERS = 1,
PROCEDURE_NAME = procReceiveBrokerResponse,
EXECUTE AS OWNER) ;'

EXEC sp_executesql @sql

-- finally alter the destination queue to activate the receiving proc when a message is received
SET @sql = N'
USE '+@RptDb+N'

ALTER QUEUE Q_ReportingReplication_'+@RptDb+N'
WITH STATUS = ON
,ACTIVATION (PROCEDURE_NAME = dbo.procApplyTransactions_Master
,MAX_QUEUE_READERS = 1
,EXECUTE AS OWNER
,STATUS = ON)'

EXEC sp_executesql @sql



.

social.technet.microsoft.com/Forums

Friday, May 10, 2013

[SQL Server Data Warehousing] Need help with Change Data Capture


It would take a little effort, but using a combination of triggers and the service broker you can capture inserts/updates/deletes and fire async messages to store the data in a separate database, or even SQL instance.


From your "CDC" database you can then process your change data and do whatever you like with it, including running an ETL process against it for populating a Data Warehouse.



.

social.technet.microsoft.com/Forums

Search This Blog