В качестве предисловия
Наверное, многие из вас в своей практике сталкивались с задачей сбора почты с ряда ящиков. Зачем это может быть нужно? Наверное, потому что это универсальный механизм обмена данными между системами. Множество библиотек под любые языки, реализующих SMTP, POP3, IMAP, готовые решения по реализации стэка сообщений (как я сложно назвал почтовый ящик...) и т.д.
Неудивительно, что многие интеграционные задачи реализуются именно через почту. Тут в дело вступает некий сервис, который умеет эту почту быстро забирать, категоризировать и выполнять необходимые действия.
Кому достаточно приведенного ниже кода — дальше могут не читать:
foreach (var mailbox in mailboxes)
using (var client = new Pop3Client())
{
client.Connect(Hostname, Port, false);
client.Authenticate(User, Password);
var count = client.GetMessageCount();
for (var i = 0; i < count; i++)
{
Mail = client.GetMessage(i + 1);
var cat = SortMail(Mail);
DoSomething(Mail, cat);
}
}
Что будем делать
Сразу сделаем ряд допущений:
1) Собирать почту нужно для нескольких систем. Может в будущем ещё для нескольких. И ещё… В общем решение должно быть универсальное;
2) Почты возможно будет много — следует из пункта 1 (а иначе я бы не писал этот пост);
3) Почту придется парсить;
4) Все ящики сервисные — пользователи туда не лезут.
Что будем использовать
Система должна работать 24/7, поэтому реализуем её в виде Windows Service. Для этих целей предлагаю сразу использовать TopShelf.
Разумеется, всё должно быть распараллелено. Тут на сцену выходит моя любимая библиотека TPL DataFlow.
Забирать почту будем по POP3. Все «модные штучки» IMAP в данной задаче излишни — надо как можно быстрее и проще забрать исходник письма и удалить его на сервера. POP3 тут хватит за глаза. Используем OpenPop.NET.
Как уже говорилось, почту будем парсить. Может через Regex, может кастомные логики… мало ли что. Именно поэтому нужно гибко и быстро подпихивать новые правила с помощью плагинов. Тут нам поможет Managed Extensibility Framework.
Логи пишем через NLog.
В качестве факультатива прикрутим мониторинг в Zabbix. (Мы же собрались работать 24/7 и выдавать хваленую скорость — нужно следить за этим).
Поехали
Создаем обычное консольное приложение. Открываем NuGet консоль и ставим все нужные пакеты:
Install-Package Nlog
Install-Package OpenPop.NET
Install-Package TopShelf
Install-Package Microsoft.TPL.DataFlow
Переходим в папку проекта, создаем App.Debug.config и App.Release.config. Выгружаем проект из студии, открываем его код (Здесь и далее TopCrawler.csproj). В секцию с конфигом добавляем:
Конфигурации
<None Include="App.Debug.config">
<DependentUpon>App.config</DependentUpon>
</None>
<None Include="App.Release.config">
<DependentUpon>App.config</DependentUpon>
</None>
А ниже собственный таргет для MSBuild:
Transform target
<UsingTask TaskName="TransformXml" AssemblyFile="$(MSBuildExtensionsPath32)\Microsoft\VisualStudio\v$(VisualStudioVersion)\Web\Microsoft.Web.Publishing.Tasks.dll" />
<Target Name="AfterCompile" Condition="Exists('App.$(Configuration).config')">
<!--Generate transformed app config in the intermediate directory-->
<TransformXml Source="App.config" Destination="$(IntermediateOutputPath)$(TargetFileName).config" Transform="App.$(Configuration).config" />
<!--Force build process to use the transformed configuration file from now on.-->
<ItemGroup>
<AppConfigWithTargetPath Remove="App.config" />
<AppConfigWithTargetPath Include="$(IntermediateOutputPath)$(TargetFileName).config">
<TargetPath>$(TargetFileName).config</TargetPath>
</AppConfigWithTargetPath>
</ItemGroup>
</Target>
Лично я привык именно таким способом — по старинке — добавлять трансформацию конфигов для разделения сред.
Для удобства предлагаю strongly-type конфиги. Отдельный класс будет читать конфигурацию. (О теоретических аспектах такого решения можно пообщаться в комментах). Конфиги, логи, мониторинг — отличный повод реализовать паттерн Singleton.
Создаем в проекте одноименную папку (должен же быть порядок). Внутри создаем 3 класса — Config, Logger, Zabbix. Наш логгер:
Logger
static class Logger
{
public static NLog.Logger Log { get; private set; }
public static NLog.Logger Archive { get; private set; }
static Logger()
{
Log = LogManager.GetLogger("Global");
Archive = LogManager.GetLogger("Archivator");
}
}
Мониторинг с помощью Zabbix заслуживает отдельного поста, поэтому я просто оставлю тут класс, реализующий агента:
Zabbix
namespace TopCrawler.Singleton
{
/// <summary>
/// Singleton: zabbix sender class
/// </summary>
static class Zabbix
{
public static ZabbixSender Sender { get; private set; }
static Zabbix()
{
Sender = new ZabbixSender(Config.ZabbixServer, Config.ZabbixPort);
}
}
struct ZabbixItem
{
public string Host;
public string Key;
public string Value;
}
class ZabbixSender
{
internal struct SendItem
{
// ReSharper disable InconsistentNaming - Zabbix is case sensitive
public string host;
public string key;
public string value;
public string clock;
// ReSharper restore InconsistentNaming
}
#pragma warning disable 0649
internal struct ZabbixResponse
{
public string Response;
public string Info;
}
#pragma warning restore 0649
#region --- Constants ---
public const string DefaultHeader = "ZBXD\x01";
public const string SendRequest = "sender data";
public const int DefaultTimeout = 10000;
#endregion
#region --- Fields ---
private readonly DateTime _dtUnixMinTime = DateTime.SpecifyKind(new DateTime(1970, 1, 1), DateTimeKind.Utc);
private readonly int _timeout;
private readonly string _zabbixserver;
private readonly int _zabbixport;
#endregion
#region --- Constructors ---
public ZabbixSender(string zabbixserver, int zabbixport)
: this(zabbixserver, zabbixport, DefaultTimeout)
{
}
public ZabbixSender(string zabbixserver, int zabbixport, int timeout)
{
_zabbixserver = zabbixserver;
_zabbixport = zabbixport;
_timeout = timeout;
}
#endregion
#region --- Methods ---
public string SendData(ZabbixItem itm)
{
return SendData(new List<ZabbixItem>(1) { itm });
}
public string SendData(List<ZabbixItem> lstData)
{
try
{
var serializer = new JavaScriptSerializer();
var values = new List<SendItem>(lstData.Count);
values.AddRange(lstData.Select(itm => new SendItem
{
host = itm.Host,
key = itm.Key,
value = itm.Value,
clock = Math.Floor((DateTime.Now.ToUniversalTime() - _dtUnixMinTime).TotalSeconds).ToString(CultureInfo.InvariantCulture)
}));
var json = serializer.Serialize(new
{
request = SendRequest,
data = values.ToArray()
});
var header = Encoding.ASCII.GetBytes(DefaultHeader);
var length = BitConverter.GetBytes((long)json.Length);
var data = Encoding.ASCII.GetBytes(json);
var packet = new byte[header.Length + length.Length + data.Length];
Buffer.BlockCopy(header, 0, packet, 0, header.Length);
Buffer.BlockCopy(length, 0, packet, header.Length, length.Length);
Buffer.BlockCopy(data, 0, packet, header.Length + length.Length, data.Length);
using (var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp))
{
socket.Connect(_zabbixserver, _zabbixport);
socket.Send(packet);
//Header
var buffer = new byte[5];
ReceivData(socket, buffer, 0, buffer.Length, _timeout);
if (DefaultHeader != Encoding.ASCII.GetString(buffer, 0, buffer.Length))
throw new Exception("Invalid header");
//Message length
buffer = new byte[8];
ReceivData(socket, buffer, 0, buffer.Length, _timeout);
var dataLength = BitConverter.ToInt32(buffer, 0);
if (dataLength == 0)
throw new Exception("Invalid data length");
//Message
buffer = new byte[dataLength];
ReceivData(socket, buffer, 0, buffer.Length, _timeout);
var response = serializer.Deserialize<ZabbixResponse>(Encoding.ASCII.GetString(buffer, 0, buffer.Length));
return string.Format("Response: {0}, Info: {1}", response.Response, response.Info);
}
}
catch (Exception e)
{
return string.Format("Exception: {0}", e);
}
}
private static void ReceivData(Socket pObjSocket, byte[] buffer, int offset, int size, int timeout)
{
var startTickCount = Environment.TickCount;
var received = 0;
do
{
if (Environment.TickCount > startTickCount + timeout)
throw new TimeoutException();
try
{
received += pObjSocket.Receive(buffer, offset + received, size - received, SocketFlags.None);
}
catch (SocketException ex)
{
if (ex.SocketErrorCode == SocketError.WouldBlock || ex.SocketErrorCode == SocketError.IOPending || ex.SocketErrorCode == SocketError.NoBufferSpaceAvailable)
Thread.Sleep(30);
else
throw;
}
} while (received < size);
}
#endregion
}
}
Конфиги… Пора уже делать хоть что-то интересное. Во-первых, в конфигах будем хранить ящики, которые мы опрашиваем. Во вторых настройки DataFlow. Предлагаю так:
Конфиги
<CredentialsList>
<credentials hostname="8.8.8.8" username="popka@example.com" password="123" port="110" type="fbl" />
<credentials hostname="8.8.8.8" username="kesha@example.com" password="123" port="110" type="bounce" />
</CredentialsList>
<DataFlowOptionsList>
<datablockoptions name="_sortMailDataBlock" maxdop="4" boundedcapacity="4" />
<datablockoptions name="_spamFilterDataBlock" maxdop="4" boundedcapacity="4" />
<datablockoptions name="_checkBounceDataBlock" maxdop="16" boundedcapacity="16" />
<datablockoptions name="_identifyDataBlock" maxdop="16" boundedcapacity="16" />
<datablockoptions name="_addToCrmDataBlock" maxdop="16" boundedcapacity="16" />
<datablockoptions name="_addToFblDataBlock" maxdop="16" boundedcapacity="16" />
<datablockoptions name="_addToBounceDataBlock" maxdop="16" boundedcapacity="16" />
</DataFlowOptionsList>
Итак, хост и порт куда конектится, юзер и пароль — тут всё понятно. Дальше тип ящика. Допустим, служба используется маркетингом (как и другими отделами). У них есть ящики, куда сваливаются автоответы на рассылки, а также отчеты о спаме FBL. Сам ящик уже категоризирует письмо, поэтому для таких ситуаций сразу задаем тип ящика. С настройками DataFlow будет понятно дальше, когда начнем создавать объекты. Тут у нас будут собственные секции в конфиге. Мануалов куча как это сделать, поэтому просто покажу результат:
Определяем типы
#region --- Types ---
static class MailboxType
{
public const string Bo = "bo";
public const string Crm = "crm";
public const string Fbl = "fbl";
public const string Bounce = "bounce";
}
class MailboxInfo
{
public string Type { get; set; }
public string Hostname { get; set; }
public string User { get; set; }
public string Password { get; set; }
public int Port { get; set; }
}
class DataBlockOptions
{
public int Maxdop { get; set; }
public int BoundedCapacity { get; set; }
public DataBlockOptions()
{
Maxdop = 1;
BoundedCapacity = 1;
}
}
#endregion
Создаем секции
/// <summary>
/// Custom config section
/// </summary>
public class CustomSettingsConfigSection : ConfigurationSection
{
[ConfigurationProperty("CredentialsList")]
public CredentialsCollection CredentialItems
{
get { return base["CredentialsList"] as CredentialsCollection; }
}
[ConfigurationProperty("DataFlowOptionsList")]
public DataBlockOptionsCollection DataFlowOptionsItems
{
get { return base["DataFlowOptionsList"] as DataBlockOptionsCollection; }
}
}
Учимся читать значения из этих секций
/// <summary>
/// Custom collection - credentials list
/// </summary>
[ConfigurationCollection(typeof(CredentialsElement), AddItemName = "credentials")]
public class CredentialsCollection : ConfigurationElementCollection, IEnumerable<CredentialsElement>
{
protected override ConfigurationElement CreateNewElement()
{
return new CredentialsElement();
}
protected override object GetElementKey(ConfigurationElement element)
{
return ((CredentialsElement)element).Username;
}
public CredentialsElement this[int index]
{
get { return BaseGet(index) as CredentialsElement; }
}
public new IEnumerator<CredentialsElement> GetEnumerator()
{
for (var i = 0; i < Count; i++)
{
yield return BaseGet(i) as CredentialsElement;
}
}
}
/// <summary>
/// Custom credentials item
/// </summary>
public class CredentialsElement : ConfigurationElement
{
[ConfigurationProperty("hostname", DefaultValue = "")]
public string Hostname
{
get { return base["hostname"] as string; }
}
[ConfigurationProperty("username", DefaultValue = "", IsKey = true)]
public string Username
{
get { return base["username"] as string; }
}
[ConfigurationProperty("password", DefaultValue = "")]
public string Password
{
get { return base["password"] as string; }
}
[ConfigurationProperty("type", DefaultValue = "")]
public string Type
{
get { return base["type"] as string; }
}
[ConfigurationProperty("port", DefaultValue = "")]
public string Port
{
get { return base["port"] as string; }
}
}
/// <summary>
/// Custom collection - DataBlock options list
/// </summary>
[ConfigurationCollection(typeof(DataBlockOptionsElement), AddItemName = "datablockoptions")]
public class DataBlockOptionsCollection : ConfigurationElementCollection, IEnumerable<DataBlockOptionsElement>
{
protected override ConfigurationElement CreateNewElement()
{
return new DataBlockOptionsElement();
}
protected override object GetElementKey(ConfigurationElement element)
{
return ((DataBlockOptionsElement)element).Name;
}
public CredentialsElement this[int index]
{
get { return BaseGet(index) as CredentialsElement; }
}
public new IEnumerator<DataBlockOptionsElement> GetEnumerator()
{
for (var i = 0; i < Count; i++)
{
yield return BaseGet(i) as DataBlockOptionsElement;
}
}
}
/// <summary>
/// Custom DataBlock options item
/// </summary>
public class DataBlockOptionsElement : ConfigurationElement
{
[ConfigurationProperty("name", DefaultValue = "", IsKey = true)]
public string Name
{
get { return base["name"] as string; }
}
[ConfigurationProperty("maxdop", DefaultValue = "")]
public string Maxdop
{
get { return base["maxdop"] as string; }
}
[ConfigurationProperty("boundedcapacity", DefaultValue = "")]
public string BoundedCapacity
{
get { return base["boundedcapacity"] as string; }
}
}
Полную реализацию конфига писать не буду, подразумевается, что в процессе разработки туда будут добавляться нужные нам параметры.
Наши кастомные настройки прочитаем так:
Читаем
public List<MailboxInfo> CredentialsList { get; private set; }
public Dictionary<string, DataBlockOptions> DataFlowOptionsList { get; private set; }
...
static Config()
{
try
{
var customConfig = (CustomSettingsConfigSection)ConfigurationManager.GetSection("CustomSettings");
//Get mailboxes
foreach (var item in customConfig.CredentialItems)
CredentialsList.Add(new MailboxInfo
{
Hostname = item.Hostname,
Port = Convert.ToInt32(item.Port),
User = item.Username,
Type = item.Type,
Password = item.Password
});
//Get DataFlow settings
foreach (var item in customConfig.DataFlowOptionsItems)
DataFlowOptionsList.Add(item.Name, new DataBlockOptions
{
Maxdop = Convert.ToInt32(item.Maxdop),
BoundedCapacity = Convert.ToInt32(item.BoundedCapacity)
});
}
catch (Exception ex)
{
Logger.Log.Fatal("Error at reading config: {0}", ex.Message);
throw;
}
}
Как-то очень затянуто получается, а мы даже не дошли до самого интересного.
Опустим пока обвязку из TopShelf, счетчики производительности, общение с БД и перейдем к делу! Создаем класс Crawler — ядро. Для начала читаем почту:
private volatile bool _stopPipeline;
...
public void Start()
{
do
{
var getMailsTasks = _config.CredentialsList.Select(credentials => Task.Run(() => GetMails(credentials))).ToList();
foreach (var task in getMailsTasks)
task.Wait();
Thread.Sleep(2000);
} while (!_stopPipeline);
//Stop pipeline - wait for completion of all endpoints
//Тут будет остановка DataFlow конвейера
if (_stopPipeline)
Logger.Log.Warn("Pipeline has been stopped by user");
}
Вот тут лень взяла свое и я решил не заморачиваться — если ящиков порядка 20-30 можно под каждый запустить таск и не париться о количестве потоков. (Разрешаю закидать помидорами.)
Переходим к самому чтению:
private void GetMails(MailboxInfo info)
{
try
{
using (var client = new Pop3Client())
{
Сразу посчитаем тайминги доступа к ящику — пригодится для диагностики сети и загруженности сервера.
//Get Zabbix metrics
var stopwatch = new Stopwatch();
stopwatch.Start();
//Get mail count
client.Connect(info.Hostname, info.Port, false);
client.Authenticate(info.User, info.Password);
stopwatch.Stop();
Отправляем данные в Zabbix. Всё просто — указываем имя хоста (как оно заведено в Zabbix), ключ (опять таки строго, как в Zabbix) и строковое значение.
//Send it to Zabbix
Zabbix.Sender.SendData(new ZabbixItem { Host = Config.HostKey, Key = info.Type + Config.TimingKey, Value = stopwatch.ElapsedMilliseconds.ToString() });
Logger.Log.Debug("Send [{0}] timing to Zabbix: connected to '{1}' as '{2}', timing {3}ms", info.Type, info.Hostname, info.User, stopwatch.ElapsedMilliseconds);
var count = client.GetMessageCount();
if (count == 0)
return;
Logger.Log.Debug("We've got new {0} messages in '{1}'", count, info.User);
//Send messages to sorting block
for (var i = 0; i < count; i++)
{
try
{
var mailInfo = new MessageInfo
{
IsSpam = false,
Mail = client.GetMessage(i + 1),
Type = MessageType.UNKNOWN,
Subtype = null,
Recipient = null,
Mailbox = info
};
Logger.Log.Debug("Download message from '{0}'. Size: {1}b", info.User, mailInfo.Mail.RawMessage.Length);
DataFlow pipeline будет создана при создании класса Crawler. Считаем, что наш первый этап — отсортировать письмо.
while (!_sortMailDataBlock.Post(mailInfo))
Thread.Sleep(500);
Видите, как просто — сам конвейер один. Все таски, читающие почту, кидают туда сообщения по одному. Если блок занят, Post вернет false и мы просто подождем пока он не освободится. Текущий потом в это время продолжает работать. Вот это я называю параллелизм без забот.
Сообщение ушло на конвейер, теперь его можно со спокойной душой сохранить в RAW архив (да-да! всё, что читаем — сохраняем в файловый архив. Служба поддержки нам потом скажет спасибо).
Настроим, например, ротацию архива:
NLog.config
<targets>
<!-- add your targets here -->
<target name="logfile" xsi:type="File" fileName="${basedir}\logs\${shortdate}-message.log" />
<target name="Archivefile" xsi:type="File" fileName="${basedir}\archive\${shortdate}-archive.dat" />
</targets>
Потом на него можно натравить logStash, но это уже другая история…
//Save every mail to archive
Logger.Log.Debug("Archive message");
Logger.Archive.Info(Functions.MessageToString(mailInfo.Mail));
}
catch (Exception ex)
{
Logger.Log.Error("Parse email error: {0}", ex.Message);
Functions.ErrorsCounters[info.Type].Increment();
//Archive mail anyway
Logger.Log.Debug("Archive message");
Logger.Archive.Info(Encoding.Default.GetString(client.GetMessageAsBytes(i + 1)));
}
if (_config.DeleteMail)
client.DeleteMessage(i + 1);
if (_stopPipeline)
break;
}
Logger.Log.Debug("Done with '{0}'", info.User);
}
}
catch (Exception ex)
{
Logger.Log.Error("General error - type: {0}, message: {1}", ex, ex.Message);
Functions.ErrorsCounters[info.Type].Increment();
}
}
Здесь мы использовали статические счетчики ошибок (в разрезе типов ящиков), где ErrorsCounters — это:
public static Dictionary<string, Counter> ErrorsCounters = new Dictionary<string, Counter>();
А сами счетчики можно сделать так:
Counter.cs
class Counter
{
private long _counter;
public Counter()
{
_counter = 0;
}
public void Increment()
{
Interlocked.Increment(ref _counter);
}
public long Read()
{
return _counter;
}
public long Refresh()
{
return Interlocked.Exchange(ref _counter, 0);
}
public void Add(long value)
{
Interlocked.Add(ref _counter, value);
}
public void Set(long value)
{
Interlocked.Exchange(ref _counter, value);
}
}
Перейдем к созданию конвейера. Допустим, у нас есть ящики, куда сыпятся автоответы. Такие письма надо распарсить (что за автоответ, от кого, по какой рассылке и т.д.) и сложить результат в хранилище (БД). Допустим, есть ящики, куда падают FBL отчеты. Такие письма сразу складываем в базу. Все прочие письма считаем «полезными» — их надо проверить на спам и отправить во внешнюю систему, например, CRM.
Как вы уже поняли, данный пример в основном рассматривает применение сборщика для задач маркетинга — сбор статистики по доставке почты, информация о спаме.
Итак, мы определились с рабочим потоком. Объявляем необходимые блоки в классе Crawler:
class MessageInfo
{
public bool IsSpam { get; set; }
public Message Mail { get; set; }
public string Subtype { get; set; }
public string Recipient { get; set; }
public MessageType Type { get; set; }
public MailboxInfo Mailbox { get; set; }
}
class Crawler
{
//Pipeline
private TransformBlock<MessageInfo, MessageInfo> _sortMailDataBlock;
private TransformBlock<MessageInfo, MessageInfo> _spamFilterDataBlock;
private TransformBlock<MessageInfo, MessageInfo> _checkBounceDataBlock;
private TransformBlock<MessageInfo, MessageInfo> _identifyDataBlock;
private ActionBlock<MessageInfo> _addToCrmDataBlock;
private ActionBlock<MessageInfo> _addToFblDataBlock;
private ActionBlock<MessageInfo> _addToBounceDataBlock;
...
Создаем метод инициализации и создаем блоки конвейера (для инициализации блоков используем наши замечательные секции из конфигов):
public void Init()
{
//*** Create pipeline ***
//Create TransformBlock to get message type
var blockOptions = _config.GetDataBlockOptions("_sortMailDataBlock");
_sortMailDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => SortMail(mail),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = blockOptions.Maxdop,
BoundedCapacity = blockOptions.BoundedCapacity
});
//Create TransformBlock to filter spam
blockOptions = _config.GetDataBlockOptions("_spamFilterDataBlock");
_spamFilterDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => FilterSpam(mail),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = blockOptions.Maxdop,
BoundedCapacity = blockOptions.BoundedCapacity
});
//Create TransformBlock to sort bounces
blockOptions = _config.GetDataBlockOptions("_checkBounceDataBlock");
_checkBounceDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => BounceTypeCheck(mail),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = blockOptions.Maxdop,
BoundedCapacity = blockOptions.BoundedCapacity
});
//Create TransformBlock to identify bounce owner
blockOptions = _config.GetDataBlockOptions("_identifyDataBlock");
_identifyDataBlock = new TransformBlock<MessageInfo, MessageInfo>(mail => GetRecipient(mail),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = blockOptions.Maxdop,
BoundedCapacity = blockOptions.BoundedCapacity
});
//Create ActionBlock to send mail to CRM
blockOptions = _config.GetDataBlockOptions("_addToCrmDataBlock");
_addToCrmDataBlock = new ActionBlock<MessageInfo>(mail => AddToCrm(mail),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = blockOptions.Maxdop,
BoundedCapacity = blockOptions.BoundedCapacity
});
//Create ActionBlock to send FBL to MailWH
blockOptions = _config.GetDataBlockOptions("_addToFblDataBlock");
_addToFblDataBlock = new ActionBlock<MessageInfo>(mail => AddToFbl(mail),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = blockOptions.Maxdop,
BoundedCapacity = blockOptions.BoundedCapacity
});
//Create ActionBlock to send Bounce to MailWH
blockOptions = _config.GetDataBlockOptions("_addToBounceDataBlock");
_addToBounceDataBlock = new ActionBlock<MessageInfo>(mail => AddToBounce(mail),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = blockOptions.Maxdop,
BoundedCapacity = blockOptions.BoundedCapacity
});
Собираем конвейер в соответствии с нашей схемой:
//*** Build pipeline ***
_sortMailDataBlock.LinkTo(_spamFilterDataBlock, info => info.Type == MessageType.GENERAL);
_sortMailDataBlock.LinkTo(_addToFblDataBlock, info => info.Type == MessageType.FBL);
_sortMailDataBlock.LinkTo(_checkBounceDataBlock, info => info.Type == MessageType.BOUNCE);
_sortMailDataBlock.LinkTo(DataflowBlock.NullTarget<MessageInfo>(), info => info.Type == MessageType.UNKNOWN); /*STUB*/
_checkBounceDataBlock.LinkTo(_identifyDataBlock);
_identifyDataBlock.LinkTo(_addToBounceDataBlock);
_spamFilterDataBlock.LinkTo(_addToCrmDataBlock, info => !info.IsSpam);
_spamFilterDataBlock.LinkTo(DataflowBlock.NullTarget<MessageInfo>(), info => info.IsSpam); /*STUB*/
Как видим, всё предельно просто — связываем блок со следующим (с возможностью задания условия связи). Все блоки исполняются параллельно. Каждый блок имеет степень параллелизма и емкость (с помощью емкости можно регулировать очередь перед блоком, то есть блок сообщение принял, но еще не обрабатывает). Таким образом, можно задавать высокую степень параллелизма для «сложных» и долгих операций, как, например, парсинг содержимого письма.
Не буду описывать матчасть DataFlow, лучше всё прочесть в первоисточнике TPL DataFlow.
Далее задаем правила выхода из блока:
_sortMailDataBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)_spamFilterDataBlock).Fault(t.Exception);
else _spamFilterDataBlock.Complete();
});
_sortMailDataBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)_addToFblDataBlock).Fault(t.Exception);
else _addToFblDataBlock.Complete();
});
_sortMailDataBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)_checkBounceDataBlock).Fault(t.Exception);
else _checkBounceDataBlock.Complete();
});
_spamFilterDataBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)_addToCrmDataBlock).Fault(t.Exception);
else _addToCrmDataBlock.Complete();
});
_checkBounceDataBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)_identifyDataBlock).Fault(t.Exception);
else _identifyDataBlock.Complete();
});
_identifyDataBlock.Completion.ContinueWith(t =>
{
if (t.IsFaulted) ((IDataflowBlock)_addToBounceDataBlock).Fault(t.Exception);
else _addToBounceDataBlock.Complete();
});
}
Всё, на самом деле конвейер уже работает, можно постить в него сообщения. Осталось только остановить его дополнив наш метод Start:
Start
public void Start()
{
do
{
var getMailsTasks = _config.CredentialsList.Select(credentials => Task.Run(() => GetMails(credentials))).ToList();
foreach (var task in getMailsTasks)
task.Wait();
Thread.Sleep(2000);
} while (!_stopPipeline);
//Stop pipeline - wait for completion of all endpoints
_sortMailDataBlock.Complete();
_addToCrmDataBlock.Completion.Wait();
_addToFblDataBlock.Completion.Wait();
_addToBounceDataBlock.Completion.Wait();
if (_stopPipeline)
Logger.Log.Warn("Pipeline has been stopped by user");
}
Переходим к делегатам.
Сортировка… Ну, допустим у нас всё просто (усложнить то всегда успеем):
private MessageInfo SortMail(MessageInfo mail)
{
switch (mail.Mailbox.Type)
{
case MailboxType.Crm:
mail.Type = MessageType.GENERAL;
break;
case MailboxType.Bounce:
mail.Type = MessageType.BOUNCE;
break;
case MailboxType.Fbl:
mail.Type = MessageType.FBL;
break;
}
return mail;
}
Спам фильтр. Это на домашнюю работу — используйте SpamAssassin.
Вот вам делегат:
private MessageInfo FilterSpam(MessageInfo mail)
{
//TODO: Add SpamAssassin logic
return mail;
}
И классы для работы с API SpamAssassin (ссылка на проект).
А мы переходим к парсингу писем. Парсим мы автоответы. Тут вступает в дело MEF.
Создаем проект (dll) с интерфейсами для наших плагинов (Назовем Interfaces).
Добавляем интерфейс:
public interface ICondition
{
string Check(Message mimeMessage);
}
public interface IConditionMetadata
{
Type Type { get; }
}
И… всё. Наш TopCrawler зависит от этого проекта и проект с плагинами тоже будет использовать его.
Создаем новый проект (тоже dll), назовем Conditions.
Добавим типы автоответов:
#region --- Types ---
static class BounceType
{
public const string Full = "BounceTypeFull";
public const string Timeout = "BounceTypeTimeout";
public const string Refused = "BounceTypeRefused";
public const string NotFound = "BounceTypeNotFound";
public const string Inactive = "BounceTypeInactive";
public const string OutOfOffice = "BounceTypeOutOfOffice";
public const string HostNotFound = "BounceTypeHostNotFound";
public const string NotAuthorized = "BounceTypeNotAuthorized";
public const string ManyConnections = "BounceTypeManyConnections";
}
#endregion
И классы, реализующие наш интерфейс:
[Export(typeof(ICondition))]
[ExportMetadata("Type", typeof(ConditionNotFound1))]
public class ConditionNotFound1 : ICondition
{
public string Check(Message mimeMessage)
{
if (!mimeMessage.MessagePart.IsMultiPart)
return null;
const string pattern = "Diagnostic-Code:.+smtp.+550";
var regexp = new Regex(pattern, RegexOptions.IgnoreCase);
return mimeMessage.MessagePart.MessageParts.Any(part => part.ContentType.MediaType == "message/delivery-status" && regexp.IsMatch(part.GetBodyAsText())) ? BounceType.NotFound : null;
}
}
...
[Export(typeof(ICondition))]
[ExportMetadata("Type", typeof(ConditionTimeout2))]
public class ConditionTimeout2 : ICondition
{
return BounceType.Timeout;
}
...
Как вы заметилиб всё дело в атрибутах. С помощью них плагины и будут загружены.
Возвращаемся к нашему проекту и загружаем плагины:
class Crawler
{
...
//Plugins
[ImportMany]
public IEnumerable<Lazy<ICondition, IConditionMetadata>> BounceTypeConditions { get; set; }
private void LoadPlugins()
{
try
{
var container = new CompositionContainer(new DirectoryCatalog(_config.PluginDirectory), true);
container.ComposeParts(this);
}
catch (Exception ex)
{
Logger.Log.Error("Unable to load plugins: {0}", ex.Message);
}
}
...
LoadPlugins дергаем в конструкторе нашего класса. Объяснять подробно про механизм загрузки не буду — гугл справится лучше.
Переходим к нашему делегату проверки типа Bounce. Условия будут применяться по очереди, пока не сработает первое — исключающий метод:
private MessageInfo BounceTypeCheck(MessageInfo mailInfo)
{
try
{
foreach (var condition in BounceTypeConditions)
{
var res = condition.Value.Check(mailInfo.Mail);
if (res == null)
continue;
mailInfo.Subtype = res;
Logger.Log.Debug("Bounce type condition [{0}] triggered for message [{1}]", condition.Metadata.Type, mailInfo.Mail.Headers.MessageId);
break;
}
}
catch (Exception ex)
{
Logger.Log.Error("Failed to determine bounce type for message '{0}': {1}", mailInfo.Mail.Headers.MessageId, ex.Message);
Logger.ErrorsCounters[MailboxType.Bounce].Increment();
}
return mailInfo;
}
Таким образомб если появляется новая логикаб достаточно просто добавить в проект с плагинами новый класс, реализующий наш интерфейс и — вуаля! Пример второго плагина по определению отправителя письма прикладывать не буду — итак уже длинный пост (Автоответ сгенерировал сам сервер, поэтому отправителя тоже надо распарсить из заголовков письма).
С записью результатов в БД тоже ничего необычного. Например, так:
private void AddToBounce(MessageInfo mail)
{
try
{
MailWH.BounceAdd(mail);
Functions.ProcessedCounters[MailboxType.Bounce].Increment();
Functions.Log.Debug("Send Bounce to MailWH");
}
catch (Exception ex)
{
Functions.Log.Error("Error saving Bounce message '{0}' to MailWH: {1}", mail.Mail.Headers.MessageId, ex.Message);
Functions.ErrorsCounters[MailboxType.Bounce].Increment();
}
}
BounceAdd
public static long BounceAdd(MessageInfo message)
{
using (var conn = new SqlConnection(ConnectionString))
using (var cmd = new SqlDataAdapter("BounceAdd", conn))
{
var body = message.Mail.FindFirstPlainTextVersion() == null
? message.Mail.FindFirstHtmlVersion().GetBodyAsText()
: message.Mail.FindFirstPlainTextVersion().GetBodyAsText();
var outId = new SqlParameter("@ID", SqlDbType.BigInt)
{
Direction = ParameterDirection.Output
};
cmd.SelectCommand.CommandType = CommandType.StoredProcedure;
cmd.SelectCommand.Parameters.Add(new SqlParameter("@RawMessage", message.Mail.RawMessage));
cmd.SelectCommand.Parameters.Add(new SqlParameter("@Message", body));
cmd.SelectCommand.Parameters.Add(new SqlParameter("@Subject", message.Mail.Headers.Subject ?? ""));
cmd.SelectCommand.Parameters.Add(new SqlParameter("@MessageID", message.Mail.Headers.MessageId ?? ""));
cmd.SelectCommand.Parameters.Add(new SqlParameter("@AddressTo", message.Mail.Headers.To[0].Address ?? ""));
cmd.SelectCommand.Parameters.Add(new SqlParameter("@AddressFrom", message.Mail.Headers.From.Address ?? ""));
cmd.SelectCommand.Parameters.Add(new SqlParameter("@DateRecieved", DateTime.Now));
cmd.SelectCommand.Parameters.Add(new SqlParameter("@BounceTypeSysName", (object)message.Subtype ?? DBNull.Value));
cmd.SelectCommand.Parameters.Add(new SqlParameter("@SourceFrom", (object)message.Recipient ?? DBNull.Value));
// TODO: Add ListId support
cmd.SelectCommand.Parameters.Add(new SqlParameter("@ListId", DBNull.Value));
cmd.SelectCommand.Parameters.Add(outId);
conn.Open();
cmd.SelectCommand.ExecuteNonQuery();
return outId.Value as long? ?? 0;
}
}
Простите, что не успел показать TopShelf — пост и так уже слишком раздулся.
Выводы
В этом уроке мы узнали, что задача сбора почты может оказаться не такой простой. Разработанное ядро позволяет быстро добавлять новые шаги процесса — DataFlow-блоки, не затрагивая существующую логику. Подсистема плагинов позволяет быстро наращивать скриптоподобную логику парсинга, а сам DataFlow распараллеливает все вычисления (причем мы имеем возможность гибко настраивать многопоточность под конкретную машину). TopShelf дает нам возможность запускать сервис как в режиме службы, так и в консольном режиме для облегчения отладки.
Фух… Если будет интересно, дальше расскажу, как поставить это на рельсы Continious Integration, настроить автобилды и выпуск релиза через VS Release Management.
lorc
Фу такое делать. Почему бы не завести банальный семафор или мютекс и не блокироваться на нём внутри Post()?
risedphantom Автор
Согласен — не очень красиво. Как-то «по-школьному», но переписывать DataFlow Post не очень хочется. Решил пожертвовать одним активным потоком :)