Developing the component

Environment

First of all, we need to set up our environment. What we need is a Microsoft SQL Dveloper or Enterprise edition, with Visual Studio 2010. I strongly recommend to set up this environment on a Windows Server 2008 or 2012 machine. Even a VM is fine for developing this component. When installing Microsoft SQL Server be sure to install everything about Business Intelligence and Integration Services, playing attention to the SDK.

The component we are going to develop will work together with a MS SQL Server 2012, so .NET framework 4.0 is required.

One we open Visual Studio 2010, we need to create a new C# Class library. Be sure to select the .NET 4.0 version in the properties pane of the project. At this point we need to link related library references, since we are going to extend a basic C# component named PipelineComponent. The following are all the libraries you need to reference and their relative path:

Microsoft.SqlServer.DTSPipelineWrap

C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies\Microsoft.SqlServer.DTSPipelineWrap.dll

Microsoft.SqlServer.DTSRuntimeWrap

C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies\Microsoft.SQLServer.DTSRuntimeWrap.dll

Microsoft.SqlServer.PipelineHost

C:\Program Files (x86)\Microsoft SQL Server\110\SDK\Assemblies\Microsoft.SqlServer.PipelineHost.dll

 

For every library, in the property pane, set the “Embed Interop Types” to False.

Straight to the point: code.

What we are going to develop is a custom DataFlow component. We’ll extend the PipelineComponent class, which allows us to override some methods to process inputs by using a buffer.

Here it is a very basic code list:


using System;

using System.Collections;

using Microsoft.SqlServer.Dts.Pipeline;

using Microsoft.SqlServer.Dts.Pipeline.Wrapper;

using Microsoft.SqlServer.Dts.Runtime.Wrapper;

using System.Threading.Tasks;

using System.Collections.Generic;

 

namespace com.webkingsoft.ssis.pipelinecomponent

{

    [DtsPipelineComponent

        (

            DisplayName = "WS MassTrimmer",

            ComponentType = ComponentType.Transform,

            IconResource = "com.webkingsoft.ssis.pipelinecomponent.MassTrimmer.ico"

        )

    ]

    public class MassTrimmer : PipelineComponent

    {

        public const int PARALLEL_THD = 4;

        ArrayList indexesToTrim = new ArrayList();

        ArrayList indexesToCopy = new ArrayList();

        object[] indexesToTrimArray;

        bool userParallelism = false;

 

        public override void ProvideComponentProperties()

        {

            base.ProvideComponentProperties();

            ComponentMetaData.InputCollection[0].Name = "Input to trim";

            ComponentMetaData.OutputCollection[0].Name = "Trimmed outputs";

            ComponentMetaData.Description = "Webkingsoft MassTrimmer: allows you to trim all columns of a table in a couple of clicks.";

            ComponentMetaData.ContactInfo = "Alberto Geniola, albertogeniola@gmail.com";

        }

 

        ///<summary>

        /// This method is called once before execution. We set up this component by choosing which strategy to use when processisng input:

        /// If we have less than PARALLEL_THD columns as inputs, we'll use a serial approach. Otherwise we'll use parallelism.

        ///</summary>

        public override void PreExecute()

        {

            IDTSInput100 input = ComponentMetaData.InputCollection[0];

            IDTSInputColumnCollection100 inputColumns = input.InputColumnCollection;

            foreach (IDTSInputColumn100 column in inputColumns)

            {

                if (column.DataType == DataType.DT_STR || column.DataType == DataType.DT_WSTR)

                {

                    indexesToTrim.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));

                }

                else

                    indexesToCopy.Add((int)BufferManager.FindColumnByLineageID(input.Buffer, column.LineageID));

            }

 

            if (indexesToTrim.Count > PARALLEL_THD)

                userParallelism = true;

            else

                userParallelism = false;

            indexesToTrimArray = indexesToTrim.ToArray();

        }

 

        ///<summary>

        /// This method will process each row at once. The .NET framework uses to collect some rows in a buffer and pass the whole buffer to the

        /// function. So we have to process multiple rows at once.

        ///</summary>

        ///<param name="inputID"></param>

        ///<param name="buffer"></param>

        public override void ProcessInput(int inputID, PipelineBuffer buffer)

        {

            while (buffer.NextRow())

            {

                // Considering this component is usefoul when the input contains many columns, it's crucial to

                // optimize parallelism. If number of columns is lower than PARALLEL_THD, we will use a serial cycle, otherwise we

                // use a parallel cycle.

                if (userParallelism)

                    Parallel.ForEach(indexesToTrimArray, new ParallelOptions { MaxDegreeOfParallelism = PARALLEL_THD }, columnIndex =>

                    {

                        string str = buffer.GetString((int)columnIndex);

                        buffer.SetString((int)columnIndex, str.Trim());

                    });

                else

                    foreach (int columnIndex in indexesToTrim)

                    {

                        string str = buffer.GetString((int)columnIndex);

                        buffer.SetString((int)columnIndex, str.Trim());

                    }

            }

        }

    }

}


 

I’ll now comment shortly each section of the code, just to help you understand what it does and why we did that way. For you information, I started with a bunch of code form the MSDN. There was a nice example of an “uppercase” DataFlow component, which aims to transform every input string into an uppercase string. Basically, I introduced some optimizations, comments and logic to accomplish my objective.

 

Attributes

In order to use our component in a SSIS environment, we need to accomplish to a specific interface contract. This includes to add the DtsPipelineComponent to our class, specifying some info about the component name, icon and a short description. If you want to embed an icon to the project, you’ll need to set it as “Embedded Recource” and set it as main icon from the project-property pane.

After that, we need to extend the PipelineComponent class, which will implement all needed interfaces for us. A PipelineComponent is a SSIS transformation package which “virtually” processes one line at once, getting a row from the input, processing it and returning it to the output.

ProvideComponentProperties

This method is the first one we need to override. Here we are setting up all the metadata information about this component. In this case, I’ve just called the relative parent class method, set up some information about the component and I’ve renamed the input line and the output line, in order to make it more understandable.

PreExecute

The PreExecute method is used during RunTime to set up initial condition for the component, before starting the row transformation method. The more we can do here, the lighter will be the process method. In my case I use this method to scan all input columns and identify all the textual columns, saving their indexes into an ArrayList. This is done by simply getting the ColumnDataType and checking if it is DT_STR or DT_WSTR (String or WideString). After I copy those values inside an Array. This operation is necessary for two main objectives:

-          We’ll probably use Parallel.Foreach inside the ProcessInput method, which requires an array as argument;

-          Using an array wastes more memory, but requires less time in accessing data.

Then, counting the number of textual elements, there’s a check against a threshold integer value: if the list contains more than THRESHOLD elements, than the code will use a parallel foreach in the ProcessInput method. Otherwise, it follows a linear iterative approach for processing input.

ProcessInput

This method is invoked an unspecified number of times by the SSIS environment. As I mentioned before, the framework might invoke it each time the pipeline receives a row from the input. However, for optimization sake, a “buffer” object will collect more than just one single row and that one is passed as argument in this method. For this reason, it’s necessary to scan the buffer unless it becomes empty and process every row in this way. Inside we have a conditional logic-split: if we counted more than THRESHOLD elements to trim, we’ll use a “Parallel foreach” loop: this helps a lot, especially on powerful multicore servers. I’ve analyzed performance impact of this parameter in the last section of this article.

That’s it. However, there are many possible enhancements: letting the design user to set the grade of parallelism is a good point to start from.

Last edited Apr 25, 2014 at 12:53 PM by webking, version 1