Schlagwort-Archive: Quarantäne

Quarantäne-Teil 3: Anlegen der zusätzlichen Komponenten

Nachdem wir im letzten Blog-Kapitel alle Komponenten eines Data Flows durchlaufen konnten und dazu erkennen konnten, ob eine Fehlerbehandlung möglich und noch nicht vorhanden ist, müssen wir nun die Fehlerbehandlung in diesen Fällen einbauen.

Die Fehlerbehandlung soll wie folgt funktionieren:

  • Fehlerbehandlung auf Redirect Row einstellen
  • Den Error Output in eine neu zu erstellende Derived Column leiten.
  • In der Derived Column sollen einige zusätzliche Attribute wie Fehlermeldung, aber auch Primärschlüssel hinzugefügt werden
  • Das Ergebnis läuft dann in einen UNION ALL, der bereits im Paket existiert.

Damit ist die Quarantäne noch nicht ganz fertig. Das UNION ALL führt dann zu einem SQL-Server-Ziel. Dort werden die Spalten aus der Union All in die Datenbank geschrieben. Diesen Teil habe ich, da er nur einmal (je Data Flow) zu erstellen ist, aber manuell gelöst. Hier ist der Aufwand geringer als eine automatisierte Lösung zu implementieren.

Fehlerbehandlung auf Redirect Row umstellen

Interessanter Weise gibt es unterschiedliche Arten von Fehlerbehandlungen in SSIS. Bei manchen Komponenten wird die Fehlerbehandlung einzeln für jede Spalte (z.B. derived column) definiert, in anderen nur global für die gesamte Komponente, in manchen auch für beides.

Der Lookup hat beide Fehlerbehandlungen:

image

Hier ein Beispiel für die Fehlerbehandlung einer Derived Column:

image

Im Advanced Editor sieht das dann so aus:

image

Und die globale Komponenten-Fehlerbehandlung wird nicht verwendet:

image

Wichtig ist zu bemerken, dass diese Einstellungen natürlich nicht in dem Error Output, sondern in dem normalen (bzw. allen normalen) Output(s) und Input(s) gemacht werden müssen.

Deswegen durchläuft der Code alle Inputs und alle Spalten aller Inputs und das gleiche für die Outputs:

For Each outp As IDTSOutput100 In comp.OutputCollection
‚ globale Fehlerhandlung
If outp.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent Then
outp.ErrorRowDisposition = DTSRowDisposition.RD_RedirectRow
End If
If outp.TruncationRowDisposition = DTSRowDisposition.RD_FailComponent Then
outp.TruncationRowDisposition = DTSRowDisposition.RD_RedirectRow
End If
‚Fehlerbehandlung je Spalte
For Each col As IDTSOutputColumn100 In outp.OutputColumnCollection
If col.ErrorRowDisposition = DTSRowDisposition.RD_FailComponent Then
col.ErrorRowDisposition = DTSRowDisposition.RD_RedirectRow
End If
If col.TruncationRowDisposition = DTSRowDisposition.RD_FailComponent Then
col.TruncationRowDisposition = DTSRowDisposition.RD_RedirectRow
End If
Next col
Next outp

Und das gleiche für die Inputs:

For Each inp As IDTSInput100 In comp.InputCollection . . .

Am besten merkt man sich noch in einer boolschen Variablen, ob ein Redirect Row eingestellt wurde. Nur dann darf man nämlich den nächsten Schritt machen.

Dies ist wichtig, da die Fehlermeldungen bei der SSIS-API-Entwicklung meist kryptisch sind, da .NET-Wrapper von der API verwendet werden.  

Eine Derived Column-Komponente erstellen

Wir erstellen sie so:

’neue Derived Column
Dim compDerivedCol As IDTSComponentMetaData100 = pipe.ComponentMetaDataCollection.New()
compDerivedCol.ComponentClassID = „DTSTransform.DerivedColumn“
Dim DesignDerivedTransformColumns As CManagedComponentWrapper = compDerivedCol.Instantiate()
DesignDerivedTransformColumns.ProvideComponentProperties()
compDerivedCol.Name = „Fehler_“ & comp.Name
compDerivedCol.InputCollection(0).ExternalMetadataColumnCollection.IsUsed = False
compDerivedCol.InputCollection(0).HasSideEffects = False

Man beachte das Initiate(). Dadurch wird die derived Column wie im Visual Studio auch instantiiert. Das heißt, man kann auf alle Voreinstellungen zugreifen.

Den Fehler-Output und die Derived Column verbinden

Hierzu legt man einen neuen Pfad an, der den Error Output (den wir im letzten Blog gefunden und gemerkt hatten) mit der Derived Column verbindet:

Dim path As IDTSPath100 = pipe.PathCollection.New()
path.AttachPathAndPropagateNotifications(comp.OutputCollection(nrOfErrorOutput), compDerivedCol.InputCollection(0))

Zusätzliche Spalten in der Derived Column anlegen

Da man das immer wieder braucht, habe ich dazu eine Methode erstellt.

Diese erhält als Parameter:

  • die Komponente (compDerivedCol)
  • den Namen der Spalte
  • den Datentyp, also zum Beispiel Microsoft.SqlServer.Dts.Runtime.Wrapper.DataType.DT_I4
    Hier kann man wunderbar den vorhandenen enum verwenden.
  • die Länge des Datentyps – nur bei Strings (o.ä.) nötig. Bei Integer kann man 0 angeben. Das System macht es automatisch richtig.
  • Die Formel, also z.B. „(DT_WSTR,50)@[System::PackageName]“ für den Paketnamen – oder zum Auswerten eigener Variablen. Die Formel darf keine Anführungszeichen (“) enthalten.
  • Optional die LineageID – dazu komme ich im Anschluss

Hier der Code:

Private Sub addNewColumn2DerivedComponent(derivedColumnComponent As IDTSComponentMetaData100, name As String, dataType As Wrapper.DataType, dataTypeLaenge As Integer, expression As String, Optional lineageIDKeyColumn As Integer = 0)
Dim neueSpalte As IDTSOutputColumn100 = derivedColumnComponent.OutputCollection(0).OutputColumnCollection.New()
neueSpalte.Name = name
neueSpalte.SetDataTypeProperties(dataType, dataTypeLaenge, 0, 0, 0)
neueSpalte.ExternalMetadataColumnID = 0
neueSpalte.ErrorRowDisposition = DTSRowDisposition.RD_IgnoreFailure
neueSpalte.TruncationRowDisposition = DTSRowDisposition.RD_IgnoreFailure

Dim neueSpalteProp As IDTSCustomProperty100 = neueSpalte.CustomPropertyCollection.New()
neueSpalteProp.Name = „Expression“
If lineageIDKeyColumn = 0 Then
neueSpalteProp.Value = expression
Else
neueSpalteProp.Value = „(DT_WSTR,“ & dataTypeLaenge & „)#“ + lineageIDKeyColumn.ToString()
End If
neueSpalteProp = neueSpalte.CustomPropertyCollection.New()
neueSpalteProp.Name = „FriendlyExpression“
neueSpalteProp.Value = expression

End Sub

Hier einige Erklärungen dazu:

  • Am Anfang wird die Spalte angelegt mit dem entsprechenden Namen und Datentyp
  • Die derived column hat als Fehlertyp “Ignore Failure”, weil wir ja sonst die Idee der Quarantäne ad absurdum führen würden.
  • Dann werden zwei Properties angelegt, die notwendig sind – die Expression und die “friendly expression”. Mit der “Expression” rechnet SSIS, die “friendly expression” wird angezeigt.

Und jetzt die Geschichte mit der lineage-ID, wie versprochen:

Man kann auch in den Formeln (expression) auf bestehende Spalten zugreifen. Das funktioniert über die lineage-Id der Spalte, also beispielsweise so “(DT_WSTR,50)#17”. Die “friendly expression” muss nicht unbedingt angegeben werden.

Die lineage-ID kann man sich ermitteln, indem man alle Spalten des Inputs durchläuft und den Namen der Spalte kennt. Allerdings kommt dabei der Wrapper für managed code zum Einsatz, den ich vorher bereits angedeutet habe. Wir benötigen nämlich hier Designer-Funktionalität, wie sie im Visual Studio auch vorhanden ist. Deswegen sieht der Code etwas hässlich aus:

Private Function findeSpalte(comp As IDTSComponentMetaData100, nameSpalte As String, ByRef nameSpalteGefunden As String) As Integer
Dim inp As IDTSInput100 = comp.InputCollection(0)
Dim virtualInp As IDTSVirtualInput100 = inp.GetVirtualInput()
Dim virtualInpCols As IDTSVirtualInputColumnCollection100 = virtualInp.VirtualInputColumnCollection
Dim designer As CManagedComponentWrapper = comp.Instantiate()

nameSpalteGefunden = „“

For Each virtualCol As IDTSVirtualInputColumn100 In virtualInpCols
If virtualCol.Name = nameSpalte Then
designer.SetUsageType(inp.ID, virtualInp, virtualCol.LineageID, DTSUsageType.UT_READONLY)
nameSpalteGefunden = nameSpalte
Return virtualCol.LineageID
End If
Next

Return 0
End Function

Diese Funktion stellt auch den UsageType der Spalte auf Readonly. Dies erscheint zunächst unnötig, ohne dies funktioniert es  aber nicht. Dies spiegelt wider, was der Advanced Editor für die Spalten anzeigt, die in den Formeln verwendet werden:

image

In diesem Beispiel wurde die EinrichtungID-Spalte in einer Formel verwendet.

So habe ich die Primärschlüssel, die ich in der Quarantäne-Tabelle mit protokollieren wollte, gefunden.

Die Derived Column mit der UNION ALL verbinden

Das funktioniert genauso wie der Pfad zur derived column, muss also nicht im Detail beschrieben werden.

Damit sind wir nun fast fertig. Es sind lediglich ein paar Besonderheiten zu beachten – s. nächster Blog-Eintrag.

Quarantäne-Teil 2: Schleife über alle Komponenten eines Data Flows und Untersuchung bestehender Fehlerbehandlung

In dem letzten Blog-Eintrag hatten wir eine Schleife um alle Data Flows erstellt und eine Data Flow identifizieren können.

Heute wollen wir eine Schleife über alle Komponenten bauen und die Komponenten etwas genauer analysieren, insbesondere auf vorhandene Fehlerbehandlung.

Im letzten Kapitel hatte wir eine Methode processAllExecutables geschrieben. Diese enthielt in einer Schleife ein Executable mit Variablennamen e.

Über

If TypeOf e Is TaskHost Then

hatten wir ermittelt, dass es sich um einen Data Flow handelt.

Schleife über alle Komponenten

Nun erstellen wir uns zunächst ein paar Variablen:

Dim th As TaskHost
th = CType(e, TaskHost)
Dim name As String
name = CType(e, TaskHost).Name

If TypeOf th.InnerObject Is
Microsoft.SqlServer.Dts.Pipeline.Wrapper.MainPipe Then
Dim pipe As MainPipe = CType(th.InnerObject, MainPipe)

End If

Diese bedeuten:

  • den Namen des Data Flows in name
  • In th steht der TaskHost, also Data Flow
  • Der th hat ein inneres Objekt, die sogenannte Main Pipe. Diese beinhaltet weiterhin alle Data Flow-Komponente. An ihr sind wir also im besonderen interessiert. Wir haben sie in der Variablen pipe gespeichert.

Nun ist es ganz einfach alle Komponenten eines Data Flows zu durchlaufen, da sie in der Collection ComponentMetaDataCollection des MainPipe-Objekts enthalten sind:

For Each comp As IDTSComponentMetaData100 In pipe.ComponentMetaDataCollection

Next comp

Identifikation des bestehenden Error Handlings

Als erste wollen wir wissen, ob diese Komponente überhaupt grundsätzlich über ein Error Handling verfügen kann. Überraschender Weise gibt es nämlich Komponenten, an die man kein Error Handling anschließen kann, z.B. MultiCast, Union all, aber auch eine Skript-Komponente, die ja definitiv einen Fehler erzeugen kann.

Wie erkennen wir nun, ob eine Komponente ein Error Handling zulässt?

Jede Komponente hat eine Input- und eine OutputCollection. Diese sieht man auch in SSIS im erweiterten Editor, so z.B. bei einer Derived Column:

image

Auf der linken Seite sehen wir 3 Ports dieser Komponente. Davon finden sich im Objekt-Modell der erste in der InputCollection und die anderen beiden in der OutputCollection. Der Fehler-Output unterscheidet sich nun von anderen Outputs dadurch, dass die Property IsErrorOut auf true gesetzt ist.

Wenn wir nun die OutputCollection durchlaufen, können wir den Error Output ermitteln bzw. erkennen, dass diese Komponente gar keinen Error Output ermöglicht.

Wichtig! Wie in der SSIS-Oberfläche auch, sind diese Outputs sichtbar unabhängig davon, ob sie nachher mit Pfaden zu einer anderen Komponente verknüpft sind. Wenn also ein Error Output nicht im Objektmodell vorhanden ist, heißt das, dass keine Fehlerbehandlung für dieses Komponente möglich ist, nicht etwa, dass nur noch keine definiert worden ist.

Was uns jetzt noch interessiert ist, ob für diese Komponente, wenn sie einen Error Output hat, dieser bereits verwendet wird. Das ist dann der Fall, wenn es einen Pfad gibt, der den Error Output als Beginn des Pfades hat. Programmatisch bedeutet das, dass wir die PathCollection durchlaufen müssen und alle Anfangspunkte des Pfades mit dem IdentificationString des Error Outputs vergleichen müssen.

Ich habe das so implementiert:

‚ermittle Error Output
Dim hatErrorOutput As Boolean = False
Dim nrOfErrorOutput As Byte = 0
Dim ErrorOutputSchonVerwendet As Boolean = False

For outpNr As Integer = 0 To comp.OutputCollection.Count – 1
‚Ist das ein Error Output?
If comp.OutputCollection(outpNr).IsErrorOut Then
‚ja!
hatErrorOutput = True
nrOfErrorOutput = outpNr
‚wird dieser Error Output schon in einem Pfad benutzt?
’schaue nach in pipe.PathCollection
Dim id As String
id = comp.OutputCollection.Item(outpNr).IdentificationString
For p As Integer = 0 To pipe.PathCollection.Count – 1
If pipe.PathCollection(p).StartPoint.IdentificationString = id Then
ErrorOutputSchonVerwendet = True
End If
Next p
End If
Next outpNr

Somit wissen wir

  • ob die Komponente ein Error Handling erlaubt (hatErrorOutput)
  • ob für diese Komponente ein Error Handling bereits definiert ist (ErrorOutputSchonVerwendet). Diesen Fall soll unser Quarantäne-Algorithmus nämlich ignorieren (Da hat sich der Entwickler des Pakets ja was dabei gedacht)

Quarantäne–Teil 1: Schleife über alle Data Flows

In einem vorangegangenen Eintrag hatte ich die Idee der Quarantäne vorgestellt, für die wir ein VB-Programm geschrieben haben, das in einem SSIS-Paket das Fehlerhandling umstellt.

DLLs einbinden

Als erstes müssen wir die folgenden DLLs einbinden:

image

also:

  • Microsoft.SqlServer.DTSPipelineWrap
  • Microsoft.SqlServer.DTSRuntimeWrap
  • Microsoft.SqlServer.ManagedDTS

Diese müssen dann mit Imports (VB) bzw. using (c#) eingebunden werden – hier immer die VB-Beispiele:

Imports System
Imports Microsoft.SqlServer.Dts.Runtime
Imports Microsoft.SqlServer.Dts.Pipeline.Wrapper
Imports System.Runtime.InteropServices

Paket laden

Als erstes wollen wir das SSIS-Paket laden und wieder speichern:

Dim ssisApp As New Microsoft.SqlServer.Dts.Runtime.Application
Dim pName As String = “C:\meineProjekte\Paket.dtsx”
Dim package As New Package
package = ssisApp.LoadPackage(pName, Nothing)
ssisApp.SaveToXml(“neuerName.dtsx”, package, Nothing)

Alle Tasks ermitteln

Nun wollen wir in dem SSIS-Paket alle Tasks durchlaufen, um sie verändern zu können. Dazu enthält das SSIS-Paket eine Property “executables”, diese enthält alle Tasks der ersten Ebene.

Mit einer Schleife können wir diese durchlaufen:

For Each e As Executable In package.executables
Next

Allerdings erreichen wir dabei nur die Tasks der ersten Ebene.

In SSIS gibt es aber Tasks, die selber wieder Tasks beinhalten:

  • Sequence
  • For Each Loop
  • For Loop

Diese interne Struktur kann man einfach in der SSIS-Umgebung anhand des Package Explorers sehen. In meinem Beispiel habe ich eine Sequence mit 2 Data Flows  und nach der Sequence noch einen Data Flow (namens “skript”):

image

Um alle Tasks durchlaufen zu können, müssen wir also rekursiv vorgehen:

im Hauptprogramm:


processAllExecutables(package.Executables, 1)

und dann als eigene Methode:

Private Sub processAllExecutables(
ByVal executables As Executables,
ByVal depth As Integer)

For Each e As Executable In executables

Dim d1 As Integer = depth + 1
If TypeOf e Is ForLoop Then
processAllExecutables(CType(e, ForLoop).Executables, d1)
End If
If TypeOf e Is Sequence Then
processAllExecutables(CType(e, Sequence).Executables, d1)
End If
If TypeOf e Is ForEachLoop Then
processAllExecutables(CType(e, ForEachLoop).Executables, d1)
End If
Next
End Sub

Wie man sieht, kann man über TypeOf Is <<Typ>> abfragen, ob die Task e einen bestimmten Typ hat. Bei den oben genannten Typen rufen wir dann die selbe Methode rekursiv auf – ich habe der Übersichtlichkeit halber die Tiefe als zusätzlichen Parameter übergeben, auch wenn das nicht unbedingt notwendig ist.

Somit haben wir das Ziel des heutigen Blogs fast erreicht: Wir durchlaufen alle Tasks. Als nächstes müssen wir Data Flows erkennen und verarbeiten.

DataFlows werden in der SSIS-API als TaskHost bezeichnet. Deswegen können wir das so abprüfen:

If TypeOf e Is TaskHost Then

End If

Damit haben wir eine Schleife über alle Data Flows programmiert.

Error Handling im Data Flow eines SSIS-Pakets

In den meisten Komponenten innerhalb eines Data Flows in einem SSIS-Paket können als Error Handling folgende Einstellungen getroffen werden:

Englisch Deutsch Bemerkung
Fail Component Fehler bei Komponente Dies ist die Standard-Einstellung. Sie führt dazu, dass bei einem – wie auch immer gearteten Fehler – der gesamte Data Flow fehlschlägt. Noch nicht verarbeitete Daten werden nicht weiter verarbeitet.
Ignore Failure Fehler ignorieren Diese Einstellung würde bedeuten, dass trotz des Fehlers normal weitergearbeitet wird. Dies ist in der Regel nicht sinnvoll.
Redirect Row Zeile umleiten Die fehlerhafte Zeile wird umgeleitet und der aufgetretene Fehler kann durch einen speziellen Ablauf behandelt werden.

In der Regel ist es sinnvoll, während der Entwicklung “Fail Component” einzusetzen. Während der Entwicklung und der Tests möchte man ja schließlich wissen, ob und wenn ja, welche Fehler auftreten, damit man sie programmatisch beheben kann.

Auch für den produktiven Einsatz kann diese Einstellung sinnvoll sein, da sie dafür sorgt, dass keine fehlerhaften Daten ins DWH übernommen werden.

Auf der anderen Seite ist die Einstellung problematisch, da dann ein fehlerhafter Satz die ETL-Strecke zum Erliegen bringt. In einem meiner Projekte laden wir alle 10 Minuten aus einem Quellsystem, das sehr großzügig mit Eingaben vorgeht.  Für dieses Projekt haben wir eine Quarantäne definiert:

Alle Komponenten sollen auf “Redirect Row” gestellt werden und diese fehlerhaften Sätze werden in eine Quarantäne-Tabelle geschrieben (unter Angabe des Primärschlüssels und des aufgetretenen Fehlers).

Danach müssen noch “halbe” Datensätze entfernt werden – es kann ja sein, dass in einem Vater-Satz ein Fehler auftritt und dieser in die Quarantäne läuft, aber alle Kinder-Sätze (der 1:n-Beziehung) richtig durchlaufen würden. Diese sollen dann aber natürlich nicht im DWH landen.

Die Umstellung der Komponenten auf “Redirect Row” wollten wir dabei nicht manuell durchführen, da wir Data Flows mit ca. 150 Komponenten haben. Deshalb haben wir ein (VB).NET-Programm erstellt, das via SSIS API diese Komponenten umstellt. Dieses Programm werde ich in den nächsten Blog-Einträgen vorstellen.